dictyBase/modware-user

View on GitHub
commands/reply.go

Summary

Maintainability
A
40 mins
Test Coverage
package commands

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"

    "github.com/dictyBase/go-genproto/dictybaseapis/pubsub"
    "github.com/dictyBase/modware-user/message"
    gclient "github.com/dictyBase/modware-user/message/grpc-client"
    "github.com/dictyBase/modware-user/message/nats"
    "github.com/sirupsen/logrus"
    "github.com/urfave/cli"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func shutdown(r message.Reply, logger *logrus.Entry) {
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
    <-ch
    logger.Info("received kill signal")
    if err := r.Stop(); err != nil {
        logger.Fatalf("unable to close the subscription %s\n", err)
    }
    logger.Info("closed the connections gracefully")
}

func replyUser(subj string, c message.UserClient, req *pubsub.IdRequest) *pubsub.UserReply {
    switch subj {
    case "UserService.Get":
        u, err := c.Get(req.Id)
        if err != nil {
            st, _ := status.FromError(err)
            return &pubsub.UserReply{
                Status: st.Proto(),
                Exist:  false,
            }
        }
        return &pubsub.UserReply{
            Exist: true,
            User:  u,
        }
    case "UserService.Exist":
        exist, err := c.Exist(req.Id)
        if err != nil {
            st, _ := status.FromError(err)
            return &pubsub.UserReply{
                Status: st.Proto(),
                Exist:  exist,
            }
        }
        return &pubsub.UserReply{
            Exist: exist,
        }
    case "UserService.Delete":
        deleted, err := c.Delete(req.Id)
        if err != nil {
            st, _ := status.FromError(err)
            return &pubsub.UserReply{
                Status: st.Proto(),
                Exist:  deleted,
            }
        }
        return &pubsub.UserReply{
            Exist: deleted,
        }
    default:
        return &pubsub.UserReply{
            Status: status.Newf(codes.Internal, "subject %s is not supported", subj).Proto(),
        }
    }
}

func RunUserReply(c *cli.Context) error {
    reply, err := nats.NewReply(
        c.String("messaging-host"),
        c.String("messaging-port"),
    )
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("cannot connect to reply server %s", err),
            2,
        )
    }
    conn, err := grpc.Dial(
        fmt.Sprintf("%s:%s", c.String("user-grpc-host"), c.String("user-grpc-port")),
        grpc.WithInsecure(),
    )
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("cannot connect to grpc server for user microservice %s", err),
            2,
        )
    }
    defer conn.Close()
    err = reply.Start(
        "UserService.*",
        gclient.NewUserClient(conn),
        replyUser,
    )
    if err != nil {
        return cli.NewExitError(
            fmt.Sprintf("cannot start the reply server %s", err),
            2,
        )
    }
    logger := getLogger(c)
    logger.Info("starting the reply messaging backend")
    shutdown(reply, logger)
    return nil
}