proxyhandler.go
package j8a
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"io/ioutil"
"net/http"
"strings"
"time"
)
// XRequestID is a per HTTP request unique identifier
const XRequestID = "X-Request-Id"
const contentEncoding = "Content-Encoding"
const transferEncoding = "Transfer-Encoding"
const acceptEncoding = "Accept-Encoding"
const contentLength = "Content-Length"
const date = "Date"
const server = "Server"
// httpClient is the global user agent for upstream requests
var httpClient HTTPClient
// httpHeadersNoRewrite contains a list of headers that are not copied in either direction. they must be set by the
// server or are ignored.
var httpHeadersNoRewrite []string = []string{connectionS, date, contentLength, acceptEncoding, transferEncoding, server, varyS}
// extract IPs for stdout. thread safe.
var ipr iprex = iprex{}
func httpHandler(response http.ResponseWriter, request *http.Request) {
proxyHandler(response, request, handleHTTP)
}
const badOrMalFormedRequest = "bad or malformed request"
const jwtBearerTokenMissing = "jwt bearer token missing, invalid, expired or unauthorized"
const unableToMapUpstreamResource = "unable to map upstream resource"
const upstreamResourceNotFound = "upstream resource not found"
const httpRequestEntityTooLarge = "http request entity too large, limit is %d bytes"
const downstreamRequestAbortedBeforeFirstUpstream = "downstream request aborted or timed out before first upstream attempt"
var httpRequestInvalidAcceptEncoding string
func formatInvalidAcceptEncoding() string {
if len(httpRequestInvalidAcceptEncoding) == 0 {
httpRequestInvalidAcceptEncoding = fmt.Sprintf("http request content encoding header must contain one of %v", SupportedContentEncodings.Print())
}
return httpRequestInvalidAcceptEncoding
}
func proxyHandler(response http.ResponseWriter, request *http.Request, exec proxyfunc) {
//preprocess incoming request in proxy object
proxy := new(Proxy).
setOutgoing(response).
parseIncoming(request)
//all malformed requests are rejected here and we return a 400
if !validate(proxy) {
if proxy.Dwn.ReqTooLarge {
sendStatusCodeAsJSON(proxy.respondWith(413, fmt.Sprintf(httpRequestEntityTooLarge, Runner.Connection.Downstream.MaxBodyBytes)))
} else if !proxy.Dwn.AcceptEncoding.hasAtLeastOneValidEncoding() {
sendStatusCodeAsJSON(proxy.respondWith(406, formatInvalidAcceptEncoding()))
} else {
sendStatusCodeAsJSON(proxy.respondWith(400, badOrMalFormedRequest))
}
return
}
//if we timed out or aborted during downstream request parsing we stop the handler before the first upstream attempt.
if proxy.hasDownstreamAbortedOrTimedout() {
infoOrTraceEv(proxy).Str(path, proxy.Dwn.Path).
Str(method, proxy.Dwn.Method).
Int64(dwnElpsdMicros, time.Since(proxy.Dwn.startDate).Microseconds()).
Str(XRequestID, proxy.XRequestID).
Msg(downstreamRequestAbortedBeforeFirstUpstream)
sendStatusCodeAsJSON(proxy)
return
}
if matchRoutes(request, proxy) {
if proxy.Route.hasJwt() && !proxy.validateJwt() {
sendStatusCodeAsJSON(proxy.respondWith(401, jwtBearerTokenMissing))
}
url, label, mapped := proxy.Route.mapURL(proxy)
if mapped {
//mapped requests are sent to proxyfuncs.
exec(proxy.firstAttempt(url, label))
} else {
//unmapped request means an internal configuration error in server
sendStatusCodeAsJSON(proxy.respondWith(503, unableToMapUpstreamResource))
}
} else {
sendStatusCodeAsJSON(proxy.respondWith(404, upstreamResourceNotFound))
}
}
// TODO this needs a new order. Exact matches first, then prefix matches. inside all these, longer matches first, then shorter ones.
// try a custom sorter for routes.
func matchRoutes(request *http.Request, proxy *Proxy) bool {
matched := false
for _, route := range Runner.Routes {
if matched = route.match(request); matched {
proxy.setRoute(&route)
break
}
}
return matched
}
func validate(proxy *Proxy) bool {
return proxy.hasLegalHTTPMethod() &&
!proxy.Dwn.ReqTooLarge &&
proxy.Dwn.AcceptEncoding.hasAtLeastOneValidEncoding()
}
const connectionClosedByRemoteUserAgent = "downstream remote user agent aborted request or closed connection"
const gatewayTimeoutTriggeredByDownstreamEvent = "gateway timeout triggered by downstream timeout"
const gatewayTimeoutTriggeredByUpstreamEvent = "gateway timeout triggered by upstream attempt"
const badGatewayTriggeredUnableToProcessUpstreamResponse = "bad gateway triggered. unable to process upstream response"
func handleHTTP(proxy *Proxy) {
upstreamResponse, upstreamError := performUpstreamRequest(proxy)
if upstreamResponse != nil && upstreamResponse.Body != nil {
defer upstreamResponse.Body.Close()
}
if !processUpstreamResponse(proxy, upstreamResponse, upstreamError) {
if proxy.shouldRetryUpstreamAttempt() {
handleHTTP(proxy.nextAttempt())
} else {
//sends 504 for downstream timeout, 504 for upstream timeout, 499 for downstream remote hangup,
//502 in all other cases
if proxy.Dwn.TimeoutFlag == true {
sendStatusCodeAsJSON(proxy.respondWith(504, gatewayTimeoutTriggeredByDownstreamEvent))
} else if proxy.Dwn.AbortedFlag == true {
sendStatusCodeAsJSON(proxy.respondWith(499, connectionClosedByRemoteUserAgent))
} else if proxy.hasUpstreamAttemptAborted() {
sendStatusCodeAsJSON(proxy.respondWith(504, gatewayTimeoutTriggeredByUpstreamEvent))
} else {
sendStatusCodeAsJSON(proxy.respondWith(502, badGatewayTriggeredUnableToProcessUpstreamResponse))
}
}
}
}
const upstreamURIResolved = "upstream URI resolved"
const keepAlive = "Keep-Alive"
func scaffoldUpstreamRequest(proxy *Proxy) *http.Request {
//this context is used to time out the upstream request
ctx, cancel := context.WithCancel(context.TODO())
//remember the cancelFunc, we may need to call it before it times out from the outside
proxy.Up.Atmpt.CancelFunc = cancel
//will call the cancel func in it's own goroutine after timeout seconds.
time.AfterFunc(time.Duration(Runner.Connection.Upstream.ReadTimeoutSeconds)*time.Second, func() {
cancel()
})
upURI := proxy.resolveUpstreamURI()
upstreamRequest, _ := http.NewRequestWithContext(ctx,
proxy.Dwn.Method,
upURI,
proxy.bodyReader())
infoOrTraceEv(proxy).Str(dwnReqPath, proxy.Dwn.Path).
Int64(dwnElpsdMicros, time.Since(proxy.Dwn.startDate).Microseconds()).
Str(XRequestID, proxy.XRequestID).
Str(upReqURI, upURI).
Msg(upstreamURIResolved)
proxy.Up.Atmpt.Aborted = upstreamRequest.Context().Done()
//this contains all accept encodings for content negotiation but is guaranteed to have one valid value.
upstreamRequest.Header.Add(acceptEncoding, proxy.Dwn.AcceptEncoding.Print())
//set upstream headers
for key, values := range proxy.Dwn.Req.Header {
if shouldProxyHeader(key) {
for _, value := range values {
upstreamRequest.Header.Add(key, value)
}
}
}
//this is redundant for HTTP/1.1, spec ref: https://datatracker.ietf.org/doc/html/rfc2616#section-8.1.3
//upstreamRequest.Header.Set(connectionS, keepAlive)
upstreamRequest.Header.Set(XRequestID, proxy.XRequestID)
return upstreamRequest
}
const upResHeaders = "upResHeaders"
const upstreamReqAborted = "upstream request processing aborted"
const upstreamResHeaderAborted = "upstream response header processing aborted"
const upstreamResHeadersProcessed = "upstream response headers processed"
const upConReadTimeoutFired = "upstream connection read timeout fired, aborting upstream response header processing."
const safeToIgnoreFailedHeaderChannelClosure = "safe to ignore. recovered internally from closed header success channel after request already handled."
const downstreamRtFired = "downstream roundtrip timeout fired"
const downstreamReqAborted = "downstream request aborted"
func performUpstreamRequest(proxy *Proxy) (*http.Response, error) {
//get a reference to this before any race conditions may occur
attemptIndex := proxy.Up.Count - 1
req := scaffoldUpstreamRequest(proxy)
var upstreamResponse *http.Response
var upstreamError error
go func() {
defer func() {
//this should never happen in production, http client doesn't panic
//if it does, abort only one request as opposed to shutting down the server.
if err := recover(); err != nil {
upstreamError = errors.New(upstreamReqAborted)
defer func() {
if err := recover(); err != nil {
//ignore
}
}()
proxy.Up.Atmpts[attemptIndex].CancelFunc()
}
}()
//this blocks until upstream headers come in
upstreamResponse, upstreamError = httpClient.Do(req)
proxy.Up.Atmpt.resp = upstreamResponse
if proxy.Up.Atmpts[attemptIndex].CompleteHeader != nil &&
!proxy.Up.Atmpts[attemptIndex].AbortedFlag &&
!proxy.Dwn.AbortedFlag {
close(proxy.Up.Atmpts[attemptIndex].CompleteHeader)
} else {
panic(upstreamReqAborted)
}
}()
//race for upstream headers complete, upstream timeout or downstream abort (timeout or cancellation)
select {
case <-proxy.Up.Atmpt.Aborted:
proxy.Up.Atmpt.AbortedFlag = true
proxy.Up.Atmpt.StatusCode = 0
//aborts due to timeout don't set upstream error
if upstreamError == nil {
scaffoldUpAttemptLog(proxy).
Int(upReadTimeoutSecs, Runner.Connection.Upstream.ReadTimeoutSeconds).
Msg(upConReadTimeoutFired)
} else {
scaffoldUpAttemptLog(proxy).
Msg(upstreamReqAborted)
}
case <-proxy.Dwn.Timeout:
proxy.Dwn.TimeoutFlag = true
scaffoldUpAttemptLog(proxy).
Msg(downstreamRtFired)
proxy.abortAllUpstreamAttempts()
scaffoldUpAttemptLog(proxy).
Msg(upstreamResHeaderAborted)
case <-proxy.Dwn.Aborted:
proxy.Dwn.AbortedFlag = true
scaffoldUpAttemptLog(proxy).
Msg(downstreamReqAborted)
proxy.abortAllUpstreamAttempts()
scaffoldUpAttemptLog(proxy).
Msg(upstreamResHeaderAborted)
case <-proxy.Up.Atmpt.CompleteHeader:
scaffoldUpAttemptLog(proxy).
RawJSON(upResHeaders, jsonifyUpstreamHeaders(proxy)).
Msg(upstreamResHeadersProcessed)
}
return upstreamResponse, upstreamError
}
const upReadTimeoutSecs = "upReadTimeoutSecs"
const safeToIgnoreFailedBodyChannelClosure = "safe to ignore. recovered internally from closed body success channel after request already handled."
const upstreamConReadTimeoutFired = "upstream connection read timeout fired, aborting upstream response body processing"
const upResBodyBytes = "upResBodyBytes"
const upstreamResParseAbort = "upstream response parsing aborted"
const upstreamResBodyAbort = "upstream response body processing aborted"
const upstreamResBodyProcessed = "upstream response body processed"
const emptyJSON = "{}"
func jsonifyUpstreamHeaders(proxy *Proxy) []byte {
if proxy.Up.Atmpt == nil || proxy.Up.Atmpt.resp == nil || proxy.Up.Atmpt.resp.Header == nil {
return []byte(emptyJSON)
}
//catch all
jsonb, err := json.Marshal(proxy.Up.Atmpt.resp.Header)
if err != nil {
jsonb = []byte(emptyJSON)
}
return jsonb
}
const upAtmptResBodyTrunc = "upAtmptResBodyTrunc"
const upAtmptCntntEnc = "upAtmptCntntEnc"
const more = "..."
const moreEncoded = " [encoded]"
func parseUpstreamResponse(upstreamResponse *http.Response, proxy *Proxy) ([]byte, error) {
var upstreamResponseBody []byte
var bodyError error
//get a reference to this before any race conditions may occur
attemptIndex := proxy.Up.Count - 1
go func() {
defer func() {
if err := recover(); err != nil {
bodyError = errors.New(upstreamResParseAbort)
defer func() {
if err := recover(); err != nil {
//ignore
}
}()
//this matters because current Up.Atmpt may have moved on
proxy.Up.Atmpts[attemptIndex].CancelFunc()
}
}()
if upstreamResponse != nil {
proxy.Up.Atmpt.StatusCode = upstreamResponse.StatusCode
if proxy.Up.Atmpt != nil &&
proxy.Up.Atmpt.resp != nil &&
proxy.Up.Atmpt.resp.Header != nil {
proxy.Up.Atmpt.ContentEncoding = NewContentEncoding(proxy.Up.Atmpt.resp.Header.Get(contentEncoding))
}
upstreamResponseBody, bodyError = ioutil.ReadAll(upstreamResponse.Body)
} else {
panic(upstreamResParseAbort)
}
//this is ok, see: https://stackoverflow.com/questions/8593645/is-it-ok-to-leave-a-channel-open#:~:text=5%20Answers&text=It's%20OK%20to%20leave%20a,it%20will%20be%20garbage%20collected.&text=Closing%20the%20channel%20is%20a,that%20no%20more%20data%20follows.
if proxy.Up.Atmpt.CompleteBody != nil &&
!proxy.Up.Atmpt.AbortedFlag &&
!proxy.Dwn.AbortedFlag {
close(proxy.Up.Atmpts[attemptIndex].CompleteBody)
} else {
panic(upstreamResParseAbort)
}
}()
select {
case <-proxy.Up.Atmpt.Aborted:
proxy.Up.Atmpt.AbortedFlag = true
if bodyError == nil {
scaffoldUpAttemptLog(proxy).
Int(upReadTimeoutSecs, Runner.Connection.Upstream.ReadTimeoutSeconds).
Msg(upstreamConReadTimeoutFired)
} else {
scaffoldUpAttemptLog(proxy).
Msg(upstreamResParseAbort)
}
case <-proxy.Dwn.Timeout:
proxy.Dwn.TimeoutFlag = true
scaffoldUpAttemptLog(proxy).
Msg(downstreamRtFired)
proxy.abortAllUpstreamAttempts()
scaffoldUpAttemptLog(proxy).
Msg(upstreamResBodyAbort)
case <-proxy.Dwn.Aborted:
proxy.Dwn.AbortedFlag = true
scaffoldUpAttemptLog(proxy).
Msg(downstreamReqAborted)
proxy.abortAllUpstreamAttempts()
scaffoldUpAttemptLog(proxy).
Msg(upstreamResBodyAbort)
case <-proxy.Up.Atmpt.CompleteBody:
ul := scaffoldUpAttemptLog(proxy)
//truncate body for logging
var t []byte
if len(upstreamResponseBody) > 25 {
t = append(t, upstreamResponseBody[0:25]...)
} else {
t = upstreamResponseBody
}
//and show what is necessary depending on encoding
if proxy.Up.Atmpt.ContentEncoding.isEncoded() {
s := hex.EncodeToString(t)
if len(s) == 50 {
s += more
}
ul.Str(upAtmptResBodyTrunc, s+moreEncoded)
} else {
s := string(t)
if len(s) == 25 {
s += more
}
ul.Str(upAtmptResBodyTrunc, s)
}
if len(proxy.Up.Atmpt.ContentEncoding) > 0 {
ul.Str(upAtmptCntntEnc, proxy.Up.Atmpt.ContentEncoding.print())
}
ul.Int(upResBodyBytes, len(upstreamResponseBody)).
Msg(upstreamResBodyProcessed)
}
return upstreamResponseBody, bodyError
}
func processUpstreamResponse(proxy *Proxy, upstreamResponse *http.Response, upstreamError error) bool {
//process only if we can work with upstream attempt
if upstreamResponse != nil && upstreamError == nil && !proxy.hasUpstreamAttemptAborted() {
//j8a blocks here when waiting for upstream body
upstreamResponseBody, bodyError := parseUpstreamResponse(upstreamResponse, proxy)
upstreamError = bodyError
proxy.Up.Atmpt.respBody = &upstreamResponseBody
if shouldProxyUpstreamResponse(proxy, bodyError) {
logSuccessfulUpstreamAttempt(proxy, upstreamResponse)
if isUpstreamClientError(proxy) {
proxy.copyUpstreamStatusCodeHeader()
sendStatusCodeAsJSON(proxy)
} else {
proxy.writeStandardResponseHeaders()
proxy.copyUpstreamResponseHeaders()
proxy.copyUpstreamStatusCodeHeader()
proxy.encodeUpstreamResponseBody()
proxy.setContentLengthHeader()
proxy.sendDownstreamStatusCodeHeader()
proxy.pipeDownstreamResponse()
logHandledDownstreamRoundtrip(proxy)
}
return true
}
}
//now log unsuccessful and retry or exit with status Code.
logUnsuccessfulUpstreamAttempt(proxy, upstreamResponse, upstreamError)
return false
}
func isUpstreamClientError(proxy *Proxy) bool {
return proxy.Up.Atmpt.StatusCode > 399 && proxy.Up.Atmpt.StatusCode < 500
}
func shouldProxyUpstreamResponse(proxy *Proxy, bodyError error) bool {
return !proxy.hasDownstreamAbortedOrTimedout() &&
!proxy.hasUpstreamAttemptAborted() &&
bodyError == nil &&
proxy.Up.Atmpt.resp.StatusCode < 500
}
func shouldProxyHeader(header string) bool {
for _, illegal := range httpHeadersNoRewrite {
if strings.EqualFold(header, illegal) {
return false
}
}
return true
}
func scaffoldUpAttemptLog(proxy *Proxy) *zerolog.Event {
return infoOrTraceEv(proxy).
Str(XRequestID, proxy.XRequestID).
Int64(upAtmtpElpsdMicros, time.Since(proxy.Up.Atmpt.startDate).Microseconds()).
Int64(dwnElpsdMicros, time.Since(proxy.Dwn.startDate).Microseconds()).
Str(upAtmpt, proxy.Up.Atmpt.print())
}
const downstreamResponseServed = "downstream HTTP response served"
const downstreamErrorResponseServed = "downstream HTTP error response served"
const pdS = "%d"
func logHandledDownstreamRoundtrip(proxy *Proxy) {
elapsed := time.Since(proxy.Dwn.startDate)
msg := downstreamResponseServed
ev := infoOrDebugEv(proxy)
if proxy.hasMadeUpstreamAttempt() {
ev = ev.Str(upReqURI, proxy.resolveUpstreamURI()).
Str(upLabel, proxy.Up.Atmpt.Label).
Int(upAtmptResCode, proxy.Up.Atmpt.StatusCode).
Int(upAtmptResBodyBytes, len(*proxy.Up.Atmpt.respBody)).
Int64(upAtmptElpsdMicros, time.Since(proxy.Up.Atmpt.startDate).Microseconds()).
Bool(upAtmptAbort, proxy.Up.Atmpt.AbortedFlag).
Str(upAtmpt, proxy.Up.Atmpt.print())
}
if proxy.Dwn.Resp.StatusCode > 399 {
msg = downstreamErrorResponseServed
//upgrade the message to warn for anything 400 and up
ev = log.Warn()
ev = ev.Str(dwnResErrMsg, proxy.Dwn.Resp.Message)
}
ev = ev.Str(dwnReqListnr, proxy.Dwn.Listener).
Str(dwnReqPort, fmt.Sprintf(pdS, proxy.Dwn.Port)).
Str(dwnReqHost, proxy.Dwn.Host).
Str(dwnReqPath, proxy.Dwn.Path).
Str(dwnReqRemoteAddr, ipr.extractAddr(proxy.Dwn.Req.RemoteAddr)).
Str(dwnReqMethod, proxy.Dwn.Method).
Str(dwnReqUserAgent, proxy.Dwn.UserAgent).
Str(dwnReqHttpVer, proxy.Dwn.HttpVer).
Int(dwnResCode, proxy.Dwn.Resp.StatusCode).
Int64(dwnResCntntLen, proxy.Dwn.Resp.ContentLength).
Int64(dwnResElpsdMicros, elapsed.Microseconds()).
Str(XRequestID, proxy.XRequestID)
//if content encoding is not set, i.e. for body less requests, do not log this field.
if len(proxy.Dwn.Resp.ContentEncoding) > 0 {
ev = ev.Str(dwnResCntntEnc, string(proxy.Dwn.Resp.ContentEncoding))
}
if Runner.isTLSOn() {
ev = ev.Str(dwnReqTlsVer, proxy.Dwn.TlsVer)
}
ev.Msg(msg)
}
const upstreamAttemptSuccessful = "upstream attempt successful"
const upstreamAttemptUnsuccessful = "upstream attempt unsuccessful, cause: "
func logSuccessfulUpstreamAttempt(proxy *Proxy, upstreamResponse *http.Response) {
scaffoldUpAttemptLog(proxy).
Int(upAtmptResCode, upstreamResponse.StatusCode).
Msg(upstreamAttemptSuccessful)
}
const undeterminedUpstreamError = "undetermined but raw error was: %v"
const upstreamHangup = "upstream TCP socket hung up on us remotely"
const eofS = "EOF"
func logUnsuccessfulUpstreamAttempt(proxy *Proxy, upstreamResponse *http.Response, upstreamError error) {
ev := scaffoldUpAttemptLog(proxy)
if upstreamResponse != nil && upstreamResponse.StatusCode > 0 {
ev = ev.Int(upAtmptResCode, upstreamResponse.StatusCode)
}
if upstreamError != nil && strings.Contains(upstreamError.Error(), eofS) {
ev.Msg(upstreamAttemptUnsuccessful + upstreamHangup)
} else {
ev.Msg(upstreamAttemptUnsuccessful + fmt.Sprintf(undeterminedUpstreamError, upstreamError))
}
}