dictyBase/modware-user

View on GitHub
commands/server.go

Summary

Maintainability
F
4 days
Test Coverage
package commands

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "net"
    "net/http"
    "os"

    "github.com/dictyBase/apihelpers/aphgrpc"
    pb "github.com/dictyBase/go-genproto/dictybaseapis/user"
    "github.com/dictyBase/modware-user/server"
    "github.com/go-chi/cors"
    grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
    grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
    grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
    "github.com/grpc-ecosystem/grpc-gateway/runtime"
    _ "github.com/jackc/pgx/stdlib"
    "github.com/sirupsen/logrus"
    "github.com/soheilhy/cmux"
    "github.com/urfave/cli"
    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
    runner "gopkg.in/mgutz/dat.v2/sqlx-runner"
)

func RunRoleServer(c *cli.Context) error {
    dbh, err := getPgWrapper(c)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("Unable to create database connection %s", err.Error()),
            2,
        )
    }
    grpcS := grpc.NewServer(
        grpc_middleware.WithUnaryServerChain(
            grpc_ctxtags.UnaryServerInterceptor(),
            grpc_logrus.UnaryServerInterceptor(getLogger(c)),
        ),
    )
    pb.RegisterRoleServiceServer(grpcS, server.NewRoleService(dbh, aphgrpc.BaseURLOption(setApiHost(c))))
    reflection.Register(grpcS)

    // http requests muxer
    runtime.HTTPError = aphgrpc.CustomHTTPError
    httpMux := runtime.NewServeMux(
        runtime.WithForwardResponseOption(aphgrpc.HandleCreateResponse),
    )
    opts := []grpc.DialOption{grpc.WithInsecure()}
    endP := fmt.Sprintf(":%s", c.String("port"))
    err = pb.RegisterRoleServiceHandlerFromEndpoint(context.Background(), httpMux, endP, opts)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("unable to register http endpoint for user microservice %s", err),
            2,
        )
    }

    // create listener
    lis, err := net.Listen("tcp", endP)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("failed to listen %s", err),
            2,
        )
    }
    // create the cmux object that will multiplex 2 protocols on same port
    m := cmux.New(lis)
    // match gRPC requests, otherwise regular HTTP requests
    // see https://github.com/grpc/grpc-go/issues/2636#issuecomment-472209287 for why we need to use MatchWithWriters()
    grpcL := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
    httpL := m.Match(cmux.Any())
    // CORS setup
    cors := cors.New(cors.Options{
        AllowedOrigins:     []string{"*"},
        AllowCredentials:   true,
        AllowedMethods:     []string{"GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"},
        OptionsPassthrough: false,
        AllowedHeaders:     []string{"*"},
    })
    httpS := &http.Server{Handler: cors.Handler(httpMux)}
    // collect on this channel the exits of each protocol's .Serve() call
    ech := make(chan error, 2)
    // start the listeners for each protocol
    go func() { ech <- grpcS.Serve(grpcL) }()
    go func() { ech <- httpS.Serve(httpL) }()
    log.Printf("starting multiplexed  server on %s", endP)
    var failed bool
    if err := m.Serve(); err != nil {
        log.Printf("cmux serve error: %v", err)
        failed = true
    }
    i := 0
    for err := range ech {
        if err != nil {
            log.Printf("protocol serve error:%v", err)
            failed = true
        }
        i++
        if cap(ech) == i {
            close(ech)
            break
        }
    }
    if failed {
        return cli.NewExitError("error in running cmux server", 2)
    }
    return nil
}

func RunUserServer(c *cli.Context) error {
    dbh, err := getPgWrapper(c)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("Unable to create database connection %s", err.Error()),
            2,
        )
    }
    grpcS := grpc.NewServer(
        grpc_middleware.WithUnaryServerChain(
            grpc_ctxtags.UnaryServerInterceptor(),
            grpc_logrus.UnaryServerInterceptor(getLogger(c)),
        ),
    )
    pb.RegisterUserServiceServer(grpcS, server.NewUserService(dbh, aphgrpc.BaseURLOption(setApiHost(c))))
    reflection.Register(grpcS)

    // http requests muxer
    runtime.HTTPError = aphgrpc.CustomHTTPError
    httpMux := runtime.NewServeMux(
        runtime.WithForwardResponseOption(aphgrpc.HandleCreateResponse),
    )
    opts := []grpc.DialOption{grpc.WithInsecure()}
    endP := fmt.Sprintf(":%s", c.String("port"))
    err = pb.RegisterUserServiceHandlerFromEndpoint(context.Background(), httpMux, endP, opts)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("unable to register http endpoint for user microservice %s", err),
            2,
        )
    }

    // create listener
    lis, err := net.Listen("tcp", endP)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("failed to listen %s", err),
            2,
        )
    }
    // create the cmux object that will multiplex 2 protocols on same port
    m := cmux.New(lis)
    // match gRPC requests, otherwise regular HTTP requests
    // see https://github.com/grpc/grpc-go/issues/2636#issuecomment-472209287 for why we need to use MatchWithWriters()
    grpcL := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
    httpL := m.Match(cmux.Any())
    // CORS setup
    cors := cors.New(cors.Options{
        AllowedOrigins:     []string{"*"},
        AllowCredentials:   true,
        AllowedMethods:     []string{"GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"},
        OptionsPassthrough: false,
        AllowedHeaders:     []string{"*"},
    })
    httpS := &http.Server{Handler: cors.Handler(httpMux)}
    // collect on this channel the exits of each protocol's .Serve() call
    ech := make(chan error, 2)
    // start the listeners for each protocol
    go func() { ech <- grpcS.Serve(grpcL) }()
    go func() { ech <- httpS.Serve(httpL) }()
    log.Printf("starting multiplexed  server on %s", endP)
    var failed bool
    if err := m.Serve(); err != nil {
        log.Printf("cmux serve error: %v", err)
        failed = true
    }
    i := 0
    for err := range ech {
        if err != nil {
            log.Printf("protocol serve error:%v", err)
            failed = true
        }
        i++
        if cap(ech) == i {
            close(ech)
            break
        }
    }
    if failed {
        return cli.NewExitError("error in running cmux server", 2)
    }
    return nil
}

func RunPermissionServer(c *cli.Context) error {
    dbh, err := getPgWrapper(c)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("Unable to create database connection %s", err.Error()),
            2,
        )
    }
    grpcS := grpc.NewServer(
        grpc_middleware.WithUnaryServerChain(
            grpc_ctxtags.UnaryServerInterceptor(),
            grpc_logrus.UnaryServerInterceptor(getLogger(c)),
        ),
    )
    pb.RegisterPermissionServiceServer(grpcS, server.NewPermissionService(dbh, aphgrpc.BaseURLOption(setApiHost(c))))
    reflection.Register(grpcS)

    // http requests muxer
    runtime.HTTPError = aphgrpc.CustomHTTPError
    httpMux := runtime.NewServeMux(
        runtime.WithForwardResponseOption(aphgrpc.HandleCreateResponse),
    )
    opts := []grpc.DialOption{grpc.WithInsecure()}
    endP := fmt.Sprintf(":%s", c.String("port"))
    err = pb.RegisterPermissionServiceHandlerFromEndpoint(context.Background(), httpMux, endP, opts)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("unable to register http endpoint for user microservice %s", err),
            2,
        )
    }

    // create listener
    lis, err := net.Listen("tcp", endP)
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("failed to listen %s", err),
            2,
        )
    }
    // create the cmux object that will multiplex 2 protocols on same port
    m := cmux.New(lis)
    // match gRPC requests, otherwise regular HTTP requests
    // see https://github.com/grpc/grpc-go/issues/2636#issuecomment-472209287 for why we need to use MatchWithWriters()
    grpcL := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
    httpL := m.Match(cmux.Any())

    // CORS setup
    cors := cors.New(cors.Options{
        AllowedOrigins:     []string{"*"},
        AllowCredentials:   true,
        AllowedMethods:     []string{"GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"},
        OptionsPassthrough: false,
        AllowedHeaders:     []string{"*"},
    })
    httpS := &http.Server{Handler: cors.Handler(httpMux)}
    // collect on this channel the exits of each protocol's .Serve() call
    ech := make(chan error, 2)
    // start the listeners for each protocol
    go func() { ech <- grpcS.Serve(grpcL) }()
    go func() { ech <- httpS.Serve(httpL) }()
    log.Printf("starting multiplexed  server on %s", endP)
    var failed bool
    if err := m.Serve(); err != nil {
        log.Printf("cmux serve error: %v", err)
        failed = true
    }
    i := 0
    for err := range ech {
        if err != nil {
            log.Printf("protocol serve error:%v", err)
            failed = true
        }
        i++
        if cap(ech) == i {
            close(ech)
            break
        }
    }
    if failed {
        return cli.NewExitError("error in running cmux server", 2)
    }
    return nil
}

func getPgxDbHandler(c *cli.Context) (*sql.DB, error) {
    cStr := fmt.Sprintf("user=%s password=%s host=%s port=%s database=%s sslmode=require",
        c.String("dictyuser-user"),
        c.String("dictyuser-pass"),
        c.String("dictyuser-host"),
        c.String("dictyuser-port"),
        c.String("dictyuser-db"),
    )
    return sql.Open("pgx", cStr)
}

func getPgWrapper(c *cli.Context) (*runner.DB, error) {
    var dbh *runner.DB
    h, err := getPgxDbHandler(c)
    if err != nil {
        return dbh, err
    }
    return runner.NewDB(h, "postgres"), nil
}

func setApiHost(c *cli.Context) string {
    if len(c.String("user-api-http-host")) > 0 {
        return c.String("user-api-http-host")
    }
    return fmt.Sprintf("http://localhost:%s", c.String("port"))
}

func getLogger(c *cli.Context) *logrus.Entry {
    log := logrus.New()
    log.Out = os.Stderr
    switch c.GlobalString("log-format") {
    case "text":
        log.Formatter = &logrus.TextFormatter{
            TimestampFormat: "02/Jan/2006:15:04:05",
        }
    case "json":
        log.Formatter = &logrus.JSONFormatter{
            TimestampFormat: "02/Jan/2006:15:04:05",
        }
    }
    l := c.GlobalString("log-level")
    switch l {
    case "debug":
        log.Level = logrus.DebugLevel
    case "warn":
        log.Level = logrus.WarnLevel
    case "error":
        log.Level = logrus.ErrorLevel
    case "fatal":
        log.Level = logrus.FatalLevel
    case "panic":
        log.Level = logrus.PanicLevel
    }
    return logrus.NewEntry(log)
}