飞道的博客

服务治理【5】Golang实现弹性服务

257人阅读  评论(0)

一、本节前言:

  • 优点:在nginx实现负载均衡,需要手动配置upstream、proxy_pass参数字段,再重启生效;对于节点不可用,nginx虽有拥有节点错误自动降低权重。但是对新增节点却毫无办法,只能手动修改配置。
    本节的内容就是解决该问题。实现服务节点的弹性伸缩,对外使用统一API网关,对内采用多样负载均衡策略
  • 缺点:本节是基于服务端的发现,需要每个后端应用服务都将自己注册到zookeeper上。所以所有的应用app都需要修改代码。

    实现本节内容主要步陬有:
    1、封装zookeeper的增删改查、注册和发现等方法
    2、负载均衡策略对象(观察者对象)
    3、配置模块对象
    4、Real Server应用服务

二、封装zookeeper的增删改查、注册和发现等方法

请看此篇文章入门《Golang基于Zookeeper实现节点注册发现、内容配置变更》,代码也是沿用此文中封装的方法

三、负载均衡策略对象(观察者对象)

3.1 工厂类

package lbalance
type LbType int
const (
	LbRandom LbType = iota
	LbRoundRobin
)
// LoadBalance 负载均衡器
type LoadBalance interface {
   
	Add(...string) error        // 添加服务节点(params: 节点和权重)
	Get(string) (string, error) // 获取可用服务节点(params: 客户端地址或者uri路径,一般用于hash负载)
	Update()                    // 更新负载均衡器配置
}
// LoadBanlanceFactory 获取策略负载均衡器
func LoadBanlanceFactory(lbType LbType) LoadBalance {
   
	switch lbType {
   
	case LbRandom:
		return &RandomBalance{
   }
	case LbRoundRobin:
		return &RoundRobinBalance{
   }
	default:
		return &RandomBalance{
   }
	}
}

3.2 随机负载均衡器(观察者)

package lbalance
type RandomBalance struct {
   
	curIndex int
	rss      []string
	conf LoadBalanceConf  //观察主体
}
func (r *RandomBalance) Add(params ...string) error {
   
	if len(params) == 0 {
   
		return errors.New("param len 1 at least")
	}
	addr := params[0]
	r.rss = append(r.rss, addr)
	return nil
}
func (r *RandomBalance) Next() string {
   
	if len(r.rss) == 0 {
   
		return ""
	}
	r.curIndex = rand.Intn(len(r.rss))
	return r.rss[r.curIndex]
}
func (r *RandomBalance) Get(key string) (string, error) {
   
	return r.Next(), nil
}
func (r *RandomBalance) SetConf(conf LoadBalanceConf) {
   
	r.conf = conf
}
func (r *RandomBalance) Update() {
   
	if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
   
		fmt.Println("Update get conf:", conf.GetConf())
		r.rss = []string{
   }
		for _, ip := range conf.GetConf() {
   
			r.Add(strings.Split(ip, ",")...)
		}
	}
}

3.3 轮询负载器(观察者)

package lbalance
type RoundRobinBalance struct {
   
	curIndex int
	rss      []string
	conf LoadBalanceConf //观察主体
}
func (r *RoundRobinBalance) Add(params ...string) error {
   
	if len(params) == 0 {
   
		return errors.New("param len 1 at least")
	}
	addr := params[0]
	r.rss = append(r.rss, addr)
	return nil
}
func (r *RoundRobinBalance) Next() string {
   
	if len(r.rss) == 0 {
   
		return ""
	}
	lens := len(r.rss) //5
	if r.curIndex >= lens {
   
		r.curIndex = 0
	}
	curAddr := r.rss[r.curIndex]
	r.curIndex = (r.curIndex + 1) % lens
	return curAddr
}
func (r *RoundRobinBalance) Get(key string) (string, error) {
   
	return r.Next(), nil
}
func (r *RoundRobinBalance) SetConf(conf LoadBalanceConf) {
   
	r.conf = conf
}
func (r *RoundRobinBalance) Update() {
   
	if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
   
		fmt.Println("Update get conf:", conf.GetConf())
		r.rss = []string{
   }
		for _, ip := range conf.GetConf() {
   
			r.Add(strings.Split(ip, ",")...)
		}
	}

四、配置模块对象

package lbalance
import "demo/zookeeper"
type LoadBalanceConf interface {
   
	Attach(o Observer) // 增加观察者
	GetConf() []string // 获取最新服务节点
	WatchConf()        // 监听服务节点上下线
	NotifyObserver()   // 通知所有观察者更新服务节点
}
// LoadBalanceZkConf 存储在zookeeper中的负载均衡器列表
type LoadBalanceZkConf struct {
   
	observers    []Observer        // 观察者列表
	path         string            // 挂载点
	zkHosts      []string          // zookeeper节点列表
	confIPWeight map[string]string // ip权重
	activeList   []string          // 当前存活的服务列表
	format       string            // 格式
}
// Attach 增加观察者
func (c *LoadBalanceZkConf) Attach(o Observer) {
   
	c.observers = append(c.observers, o)
}
// GetConf 获取最新服务节点
func (c *LoadBalanceZkConf) GetConf() []string {
   
	fmt.Println(c.activeList)
	confs := []string{
   }
	for _, ip := range c.activeList {
   
		weight, ok := c.confIPWeight[ip]
		if !ok {
   
			weight = "50" // 默认权重
		}
		confs = append(confs, fmt.Sprintf(c.format, ip)+","+weight)
	}
	return confs
}
// WatchConf 监听挂载点的主机变动并更新存活列表和通知观察者
func (c *LoadBalanceZkConf) WatchConf() {
   
	fmt.Println("监听挂载点的主机变动并更新存活列表和通知观察者")
	zkDriver, _ := zookeeper.NewZKDriver(c.zkHosts)
	// 1.监听挂载点下主机列表的变动
	chanList, chanErr := zkDriver.WatchHostsByPath(c.path)
	go func() {
   
		defer zkDriver.Close()
		for {
   
			select {
   
			case changeErr := <-chanErr:
				fmt.Println("notify path occur error when watch conf, info: ", changeErr.Error())
			case changeList := <-chanList:
				fmt.Println("service list changed")
				c.activeList = changeList
				c.NotifyObserver()
			}
		}
	}()
}
// NotifyObserver 通知观察者,及时同步存活列表
func (c *LoadBalanceZkConf) NotifyObserver() {
   
	for _, o := range c.observers {
   
		o.Update()
	}
}
// NewLoadBalanceZkConf 从zk获取最新的节点数据并实例化
func NewLoadBalanceZkConf(format, path string, zkHosts []string, conf map[string]string) (*LoadBalanceZkConf, error) {
   
	zkDriver, err := zookeeper.NewZKDriver(zkHosts)
	if err != nil {
   
		return nil, err
	}
	defer zkDriver.Close()
	// 1. 拉取最新数据
	srvs, err := zkDriver.GetListByPath(path)
	if err != nil {
   
		return nil, err
	}
	// 2. 开启同步数据
	mConf := &LoadBalanceZkConf{
   format: format, activeList: srvs, confIPWeight: conf, zkHosts: zkHosts, path: path}
	mConf.WatchConf()
	return mConf, nil
}

// Observer 观察者接口(就是给负载均衡器用滴)
type Observer interface {
   
	Update() // 更新(获取最新配置数据)
}

五、Real Server应用服务

package main
import "demo/zookeeper"

type RealServer struct {
   
	Addr string
}
func (r *RealServer) Run() {
   
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) {
   
		io.WriteString(rw, fmt.Sprintf("hit %s\n", r.Addr))
	})
	server := &http.Server{
   Addr: r.Addr, Handler: mux, WriteTimeout: time.Second * 3}
	go func() {
   
		// 1.获取zookeeper句柄
		zkDriver, err := zookeeper.NewZKDriver([]string{
   "127.0.0.1:2181"})
		if err != nil {
   
			log.Fatal(2001, err)
		}
		defer zkDriver.Close()
		// 2.将主机挂载到根下
		if err := zkDriver.RegistHostOnPath("/real_server", r.Addr); err != nil {
   
			log.Fatal(2002, err)
		}
		// 3.打印当前挂载点下的所有主机
		hosts, err := zkDriver.GetListByPath("/real_server")
		if err != nil {
   
			log.Fatal(2003, err)
		}
		fmt.Println("print current list in root: ", hosts)
		// 3.启动应用服务
		log.Fatal(server.ListenAndServe())
	}()
	log.Println("Starting httpserver at " + r.Addr)
}

func main() {
   
	server1 := &RealServer{
   Addr: "127.0.0.1:8888"}
	server2 := &RealServer{
   Addr: "127.0.0.1:6666"}
	server1.Run()
	time.Sleep(time.Second)
	server2.Run()
	//监听关闭信号
	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
}

六、测试


转载:https://blog.csdn.net/qq_38900565/article/details/116502758
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场