失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Golang etcd服务注册与发现

Golang etcd服务注册与发现

时间:2023-03-10 22:11:12

相关推荐

Golang etcd服务注册与发现

//sevice.go

package discoveryimport ("context""errors""sync""time""/coreos/etcd/clientv3"l4g "/alecthomas/log4go")type Service struct {closeChan chan struct{} //关闭通道client *clientv3.Client //etcd v3 clientleaseID clientv3.LeaseID //etcd 租约idkey string //键val string //值 wg sync.WaitGroup}// NewService 构造一个注册服务func NewService(etcdEndpoints []string, key string, val string) (*Service, error) {cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints,DialTimeout: 2 * time.Second,})if nil != err {return nil, err}s := &Service{client: cli,closeChan: make(chan struct{}),key: key,val: val,}return s, nil}// Start 开启注册// @param - ttlSecond 租期(秒)func (s *Service) Start(ttlSecond int64) error {// minimum lease TTL is 5-secondresp, err := s.client.Grant(context.TODO(), ttlSecond)if err != nil {panic(err)}s.leaseID = resp.ID_, err = s.client.Put(context.TODO(), s.key, s.val, clientv3.WithLease(s.leaseID))if err != nil {panic(err)}ch, err1 := s.client.KeepAlive(context.TODO(), s.leaseID)if nil != err1 {panic(err)}l4g.Info("[discovery] Service Start leaseID:[%d] key:[%s], value:[%s]", s.leaseID, s.key, s.val)s.wg.Add(1)defer s.wg.Done()for {select {case <-s.closeChan:return s.revoke()case <-s.client.Ctx().Done():return errors.New("server closed")case ka, ok := <-ch:if !ok {l4g.Warn("[discovery] Service Start keep alive channel closed")return s.revoke()} else {l4g.Fine("[discovery] Service Start recv reply from Service: %s, ttl:%d", s.key, ka.TTL)}}}return nil}// Stop 停止func (s *Service) Stop() {close(s.closeChan)s.wg.Wait()s.client.Close()}func (s *Service) revoke() error {_, err := s.client.Revoke(context.TODO(), s.leaseID)if err != nil {l4g.Error("[discovery] Service revoke key:[%s] error:[%s]", s.key, err.Error())} else {l4g.Info("[discovery] Service revoke successfully key:[%s]", s.key)}return err}

//watch.go

package discoveryimport ("context""os""time""/coreos/etcd/clientv3""/coreos/etcd/mvcc/mvccpb"l4g "/alecthomas/log4go""/grpc/grpclog")type GroupManager struct {wgsync.WaitGroupctx context.Contextcancel context.CancelFunconce sync.Once}func NewGroupManager() *GroupManager {ret := new(GroupManager)ret.ctx, ret.cancel = context.WithCancel(context.Background())return ret}func (this *GroupManager) Close() {this.once.Do(this.cancel)}func (this *GroupManager) Wait() {this.wg.Wait()}func (this *GroupManager) Add(delta int) {this.wg.Add(delta)}func (this *GroupManager) Done() {this.wg.Done()}func (this *GroupManager) Chan() <-chan struct{} {return this.ctx.Done()}type Target interface {Set(string, string)Create(string, string)Modify(string, string)Delete(string)}type Config struct {Servers []stringDailTimeout int64RequestTimeout int64Prefix boolTarget string}func Watch(gm *GroupManager, cfg *Config, target Target) {defer gm.Done()cli, err := clientv3.New(clientv3.Config{Endpoints: cfg.Servers,DialTimeout: time.Duration(cfg.DailTimeout) * time.Second,})if err != nil {panic(err.Error())return}defer cli.Close() // make sure to close the clientl4g.Info("[discovery] start watch %s", cfg.Target)ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.RequestTimeout)*time.Second)var resp *clientv3.GetResponseif cfg.Prefix {resp, err = cli.Get(ctx, cfg.Target, clientv3.WithPrefix())} else {resp, err = cli.Get(ctx, cfg.Target)}cancel()if err != nil {panic(err.Error())}for _, ev := range resp.Kvs {target.Set(string(string(ev.Key)), string(ev.Value))}var rch clientv3.WatchChanif cfg.Prefix {rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))} else {rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithRev(resp.Header.Revision+1))}for {select {case <-gm.Chan():l4g.Info("[discovery] watch %s close", cfg.Target)returncase wresp := <-rch:err := wresp.Err()if err != nil {l4g.Info("[discovery] watch %s response error: %s ", cfg.Target, err.Error())gm.Close()return}l4g.Debug("[discovery] watch %s response %+v", cfg.Target, wresp)for _, ev := range wresp.Events {if ev.IsCreate() {target.Create(string(ev.Kv.Key), string(ev.Kv.Value))} else if ev.IsModify() {target.Modify(string(ev.Kv.Key), string(ev.Kv.Value))} else if ev.Type == mvccpb.DELETE {target.Delete(string(ev.Kv.Key))} else {l4g.Error("[discovery] no found watch type: %s %q", ev.Type, ev.Kv.Key)}}}}}

如果觉得《Golang etcd服务注册与发现》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。