current position:Home>grpc uses consul for service registration and discovery

grpc uses consul for service registration and discovery

2022-08-06 09:24:14kankan231

目录结构:

 client/main.go:

package main

import (
	"context"
	"demo/config"
	"demo/proto"
	"fmt"
	"log"
	"time"

	_ "github.com/mbobakov/grpc-consul-resolver"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	var srvClient proto.GreeterClient
	// 服务
	if conn := InitSrvConn(config.ServiceName); conn != nil {
		srvClient = proto.NewGreeterClient(conn)
	}
	req := &proto.HelloRequest{
		Name: "",
	}
	for i := 0; i < 1000; i++ {
		req.Name = fmt.Sprintf("james%d", i)
		resp, err := srvClient.SayHello(context.Background(), req)
		if err != nil {
			log.Fatalln(err)
		}
		log.Println(resp.Message)
		time.Sleep(time.Second)
	}

}

// At the same time completed the service discovery and load balancing algorithm(轮询)
func InitSrvConn(srvName string) *grpc.ClientConn {

	conn, err := grpc.Dial(
		fmt.Sprintf("consul://%s:%d/%s?healthy=true&wait=14s",
			config.ConsulIp, config.ConsulPort, srvName),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		// grpcThere seems to be only supports polling load balancing algorithm
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
	)
	if err != nil {
		log.Println("serviceLayer the service discovery error: ", err)
		return nil
	}
	return conn
}

ptoto/greet.proto

syntax = "proto3";
option go_package = ".;proto";

service Greeter{
  rpc SayHello(HelloRequest) returns (HelloReply);
}

message HelloRequest{
  string name = 1;
}

message HelloReply{
  string message = 1;
}

server/main.go

package main

import (
	"context"
	"demo/proto"
	"flag"
	"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"

	"demo/config"

	"github.com/hashicorp/consul/api"
	uuid "github.com/satori/go.uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/health"
	"google.golang.org/grpc/health/grpc_health_v1"
)

type Server struct{}

var (
	Port *int
	IP   *string
)

func (s Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply, error) {

	return &proto.HelloReply{Message: fmt.Sprintf("hello %s from %s:%d", request.Name, *IP, *Port)}, nil
}

func main() {
	// Set to start a local serviceip和端口号
	// 默认ip如果改成127.0.0.1会导致consul健康检查失败
	IP = flag.String("ip", "192.168.168.6", "ip地址")
	Port = flag.Int("port", 8100, "端口号")

	flag.Parse()

	g := grpc.NewServer()

	proto.RegisterGreeterServer(g, &Server{})
	tcpAddr := fmt.Sprintf("%s:%d", *IP, *Port)
	log.Printf("service listen:%s", tcpAddr)
	lis, err := net.Listen("tcp", tcpAddr)
	if err != nil {
		log.Panicln("failed to listen:" + err.Error())
	}
	// 注册健康检查
	grpc_health_v1.RegisterHealthServer(g, health.NewServer())
	// 将当前grpc服务注册到consul
	serviceId := uuid.NewV4().String()
	client, err := RegisterGRPCService(*IP, config.ServiceName, serviceId, *Port, nil)
	if err != nil {
		log.Panicln("grpc服务注册失败:", err)
	}

	// Will start the service part in coroutines inside,Execution of behind the listening part of termination signal can be
	go func() {
		err = g.Serve(lis)
		if err != nil {
			log.Panicln("failed to start grpc:" + err.Error())
		}
	}()

	// 接收终止信号
	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	if err = client.Agent().ServiceDeregister(serviceId); err != nil {
		log.Println("Service logout failed")
	} else {
		log.Println("Service log out successfully")
	}

}

// 函数功能:将grpc服务注册到consul
// 参数说明
// address:For the registered serviceip
// name:服务名称
// id:服务id
// port:服务端口
// tags:服务标签
func RegisterGRPCService(address, name, id string, port int, tags []string) (*api.Client, error) {
	cfg := api.DefaultConfig()
	// 设置consulService running inip和端口
	cfg.Address = fmt.Sprintf("%s:%d", config.ConsulIp, config.ConsulPort)
	//cfg.Address的ip可以是127.0.0.1

	client, err := api.NewClient(cfg)
	if err != nil {
		log.Panic(err)
	}

	// Generate health inspection object
	check := &api.AgentServiceCheck{
		// 这里的ip不可以是127.0.0.1
		GRPC:                           fmt.Sprintf("%s:%d", address, port), // 服务的运行地址
		Timeout:                        "5s",                                // More than this time tell service status is not healthy
		Interval:                       "5s",                                // 每5s检查一次
		DeregisterCriticalServiceAfter: "30s",                               // How long after cancellation of service failure
	}

	// 生成注册对象
	registration := &api.AgentServiceRegistration{
		Name:    name,
		ID:      id,
		Address: address,
		Port:    port,
		Tags:    tags,
		Check:   check,
	}

	// 注册服务
	return client, client.Agent().ServiceRegister(registration)
}

运行步骤:

1 启动consul

2 使用protoc工具生成greet.pb.go

3 启动多个服务端

4 启动客户端

运行结果大致如下,

 

copyright notice
author[kankan231],Please bring the original link to reprint, thank you.
https://en.chowdera.com/2022/218/202208060917043039.html

Random recommended