consul的watcher源码
时间: 2023-12-03 09:53:39 浏览: 124
Consul Watcher 是 Consul 客户端中的一个组件,它允许客户端监听 Consul 服务端的变化。Watcher 的实现采用了观察者模式,当服务端发生变化时,Watcher 就会通知客户端做出相应的处理。
以下是 Consul Watcher 的源码实现:
```
type Watcher struct {
ID string
Client *Client
Consistency string
Options *QueryOptions
Ctx context.Context
Cancel context.CancelFunc
Handler WatchHandler
ErrHandler WatchErrorHandler
ErrChan chan error
UserContext interface{}
}
type WatcherOptions struct {
Consistency string
QueryOptions *QueryOptions
UserContext interface{}
}
type WatchHandler func(uint64, interface{})
type WatchErrorHandler func(error)
func (c *Client) NewWatcher(opts WatcherOptions) (*Watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
watcher := &Watcher{
ID: uuid.New().String(),
Client: c,
Consistency: opts.Consistency,
Options: opts.QueryOptions,
Ctx: ctx,
Cancel: cancel,
ErrChan: make(chan error, 1),
UserContext: opts.UserContext,
}
go watcher.watch()
return watcher, nil
}
func (w *Watcher) watch() {
var lastIndex uint64
for {
index, res, err := w.Client.raw().Watch(w.Ctx, w.build(), lastIndex)
if err != nil {
w.ErrChan <- err
continue
}
if lastIndex == 0 {
lastIndex = index
} else if lastIndex != index {
if err := w.Client.blockUntilReady(w.Ctx, w.Options); err != nil {
w.ErrChan <- err
continue
}
}
if res != nil {
lastIndex = res.LastIndex
w.Handler(lastIndex, res.Value)
}
}
}
func (w *Watcher) build() *api.QueryOptions {
q := DefaultQueryOptions()
if w.Options != nil {
q = w.Options
}
if w.Consistency != "" {
q.Consistency = w.Consistency
}
return q
}
func (w *Watcher) Stop() {
w.Cancel()
}
func (w *Watcher) Err() <-chan error {
return w.ErrChan
}
```
Watcher 结构体中包含了一些必要的信息,如客户端 Client、查询选项 Options、观察者 ID 等等。NewWatcher 函数用于创建一个新的 Watcher 实例,并启动一个新的 goroutine 来监听服务端的变化。watch 函数是 Watcher 的核心函数,它会不断地监听服务端的变化,并在服务端发生变化时调用 Handler 函数进行处理。当 Watcher 被停止时,Stop 函数会调用 Cancel 函数将 Watcher 的上下文取消,从而停止其监听服务端的变化。Err 函数返回一个 chan,用于接收 Watcher 中可能出现的错误信息。
阅读全文