一、本节前言:
- 优点:在nginx实现负载均衡,需要手动配置upstream、proxy_pass参数字段,再重启生效;对于节点不可用,nginx虽有拥有节点错误自动降低权重。但是对新增节点却毫无办法,只能手动修改配置。
本节的内容就是解决该问题。实现服务节点的弹性伸缩,对外使用统一API网关,对内采用多样负载均衡策略 - 缺点:本节是基于服务端的发现,需要每个后端应用服务都将自己注册到zookeeper上。所以所有的应用app都需要修改代码。
实现本节内容主要步陬有:
1、封装zookeeper的增删改查、注册和发现等方法
2、负载均衡策略对象(观察者对象)
3、配置模块对象
4、Real Server应用服务
二、封装zookeeper的增删改查、注册和发现等方法
请看此篇文章入门《Golang基于Zookeeper实现节点注册发现、内容配置变更》,代码也是沿用此文中封装的方法
三、负载均衡策略对象(观察者对象)
3.1 工厂类
package lbalance
type LbType int
const (
LbRandom LbType = iota
LbRoundRobin
)
// LoadBalance 负载均衡器
type LoadBalance interface {
Add(...string) error // 添加服务节点(params: 节点和权重)
Get(string) (string, error) // 获取可用服务节点(params: 客户端地址或者uri路径,一般用于hash负载)
Update() // 更新负载均衡器配置
}
// LoadBanlanceFactory 获取策略负载均衡器
func LoadBanlanceFactory(lbType LbType) LoadBalance {
switch lbType {
case LbRandom:
return &RandomBalance{
}
case LbRoundRobin:
return &RoundRobinBalance{
}
default:
return &RandomBalance{
}
}
}
3.2 随机负载均衡器(观察者)
package lbalance
type RandomBalance struct {
curIndex int
rss []string
conf LoadBalanceConf //观察主体
}
func (r *RandomBalance) Add(params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
r.rss = append(r.rss, addr)
return nil
}
func (r *RandomBalance) Next() string {
if len(r.rss) == 0 {
return ""
}
r.curIndex = rand.Intn(len(r.rss))
return r.rss[r.curIndex]
}
func (r *RandomBalance) Get(key string) (string, error) {
return r.Next(), nil
}
func (r *RandomBalance) SetConf(conf LoadBalanceConf) {
r.conf = conf
}
func (r *RandomBalance) Update() {
if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
fmt.Println("Update get conf:", conf.GetConf())
r.rss = []string{
}
for _, ip := range conf.GetConf() {
r.Add(strings.Split(ip, ",")...)
}
}
}
3.3 轮询负载器(观察者)
package lbalance
type RoundRobinBalance struct {
curIndex int
rss []string
conf LoadBalanceConf //观察主体
}
func (r *RoundRobinBalance) Add(params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
r.rss = append(r.rss, addr)
return nil
}
func (r *RoundRobinBalance) Next() string {
if len(r.rss) == 0 {
return ""
}
lens := len(r.rss) //5
if r.curIndex >= lens {
r.curIndex = 0
}
curAddr := r.rss[r.curIndex]
r.curIndex = (r.curIndex + 1) % lens
return curAddr
}
func (r *RoundRobinBalance) Get(key string) (string, error) {
return r.Next(), nil
}
func (r *RoundRobinBalance) SetConf(conf LoadBalanceConf) {
r.conf = conf
}
func (r *RoundRobinBalance) Update() {
if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
fmt.Println("Update get conf:", conf.GetConf())
r.rss = []string{
}
for _, ip := range conf.GetConf() {
r.Add(strings.Split(ip, ",")...)
}
}
四、配置模块对象
package lbalance
import "demo/zookeeper"
type LoadBalanceConf interface {
Attach(o Observer) // 增加观察者
GetConf() []string // 获取最新服务节点
WatchConf() // 监听服务节点上下线
NotifyObserver() // 通知所有观察者更新服务节点
}
// LoadBalanceZkConf 存储在zookeeper中的负载均衡器列表
type LoadBalanceZkConf struct {
observers []Observer // 观察者列表
path string // 挂载点
zkHosts []string // zookeeper节点列表
confIPWeight map[string]string // ip权重
activeList []string // 当前存活的服务列表
format string // 格式
}
// Attach 增加观察者
func (c *LoadBalanceZkConf) Attach(o Observer) {
c.observers = append(c.observers, o)
}
// GetConf 获取最新服务节点
func (c *LoadBalanceZkConf) GetConf() []string {
fmt.Println(c.activeList)
confs := []string{
}
for _, ip := range c.activeList {
weight, ok := c.confIPWeight[ip]
if !ok {
weight = "50" // 默认权重
}
confs = append(confs, fmt.Sprintf(c.format, ip)+","+weight)
}
return confs
}
// WatchConf 监听挂载点的主机变动并更新存活列表和通知观察者
func (c *LoadBalanceZkConf) WatchConf() {
fmt.Println("监听挂载点的主机变动并更新存活列表和通知观察者")
zkDriver, _ := zookeeper.NewZKDriver(c.zkHosts)
// 1.监听挂载点下主机列表的变动
chanList, chanErr := zkDriver.WatchHostsByPath(c.path)
go func() {
defer zkDriver.Close()
for {
select {
case changeErr := <-chanErr:
fmt.Println("notify path occur error when watch conf, info: ", changeErr.Error())
case changeList := <-chanList:
fmt.Println("service list changed")
c.activeList = changeList
c.NotifyObserver()
}
}
}()
}
// NotifyObserver 通知观察者,及时同步存活列表
func (c *LoadBalanceZkConf) NotifyObserver() {
for _, o := range c.observers {
o.Update()
}
}
// NewLoadBalanceZkConf 从zk获取最新的节点数据并实例化
func NewLoadBalanceZkConf(format, path string, zkHosts []string, conf map[string]string) (*LoadBalanceZkConf, error) {
zkDriver, err := zookeeper.NewZKDriver(zkHosts)
if err != nil {
return nil, err
}
defer zkDriver.Close()
// 1. 拉取最新数据
srvs, err := zkDriver.GetListByPath(path)
if err != nil {
return nil, err
}
// 2. 开启同步数据
mConf := &LoadBalanceZkConf{
format: format, activeList: srvs, confIPWeight: conf, zkHosts: zkHosts, path: path}
mConf.WatchConf()
return mConf, nil
}
// Observer 观察者接口(就是给负载均衡器用滴)
type Observer interface {
Update() // 更新(获取最新配置数据)
}
五、Real Server应用服务
package main
import "demo/zookeeper"
type RealServer struct {
Addr string
}
func (r *RealServer) Run() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) {
io.WriteString(rw, fmt.Sprintf("hit %s\n", r.Addr))
})
server := &http.Server{
Addr: r.Addr, Handler: mux, WriteTimeout: time.Second * 3}
go func() {
// 1.获取zookeeper句柄
zkDriver, err := zookeeper.NewZKDriver([]string{
"127.0.0.1:2181"})
if err != nil {
log.Fatal(2001, err)
}
defer zkDriver.Close()
// 2.将主机挂载到根下
if err := zkDriver.RegistHostOnPath("/real_server", r.Addr); err != nil {
log.Fatal(2002, err)
}
// 3.打印当前挂载点下的所有主机
hosts, err := zkDriver.GetListByPath("/real_server")
if err != nil {
log.Fatal(2003, err)
}
fmt.Println("print current list in root: ", hosts)
// 3.启动应用服务
log.Fatal(server.ListenAndServe())
}()
log.Println("Starting httpserver at " + r.Addr)
}
func main() {
server1 := &RealServer{
Addr: "127.0.0.1:8888"}
server2 := &RealServer{
Addr: "127.0.0.1:6666"}
server1.Run()
time.Sleep(time.Second)
server2.Run()
//监听关闭信号
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
六、测试
转载:https://blog.csdn.net/qq_38900565/article/details/116502758
查看评论