etcdctl基本使用
- 增加一条数据
etcdctl put "/school/class/name" "helios" - 获取一条数据
etcdctl get "/school/class/name" - 得到一组数据
etcdctl get "/school/class/" --prefix - 得到所有的key
etcdctl --keys-only=true get "" --from-key - 得到所有的key与value
etcdctl get "" --from-key
ETCDCTL_API=3 etcdctl get "" --from-key - 删除一条数据
etcdctl del "/school/class/name2" - 关于集群的操作
指定endpoints: export ENDPOINTS="172.27.143.50:2379"
查看集群状态:etcdctl --write-out=table --endpoints=$ENDPOINTS endpoint status
查看集群成员:etcdctl --write-out=table --endpoints=$ENDPOINTS member list
删除集群成员:
MEMBER_ID=8e9e05c52164694d
etcdctl --endpoints=$ENDPOINTS member remove ${MEMBER_ID} - 碎片整理
etcdctl --endpoints=$ENDPOINTS defrag - 备份当前ETCD集群
etcdctl snapshot save snapshot.db - 查看snapshot状态
etcdctl snapshot status snapshot.db - 从备份中恢复集群
etcdctl snapshot save snapshot.db - 切换leader
# 先看状态
etcdctl endpoint --cluster=true status -w table
# move-leader
./etcdctl move-leader d6414a7c7c550d29
连接ETCD
var ( config clientv3.Config client *clientv3.Client err error ) // 客户端配置 config = clientv3.Config{ Endpoints: []string{"172.27.43.50:2379"}, DialTimeout: 5 * time.Second, } // 建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return }
写入数据到ETCD
// 实例化一个用于操作ETCD的KV kv = clientv3.NewKV(client) if putResp, err = kv.Put(context.TODO(), "/school/class/students", "helios0", clientv3.WithPrevKV()); err != nil { fmt.Println(err) return } fmt.Println(putResp.Header.Revision) if putResp.PrevKv != nil { fmt.Printf("prev Value: %s \n CreateRevision : %d \n ModRevision: %d \n Version: %d \n", string(putResp.PrevKv.Value), putResp.PrevKv.CreateRevision, putResp.PrevKv.ModRevision, putResp.PrevKv.Version) }
获取ETCD里面的数据
// 实例化一个用于操作ETCD的KV kv = clientv3.NewKV(client) if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil { fmt.Println(err) return } // 输出本次的Revision fmt.Printf("Key is s %s \n Value is %s \n", getResp.Kvs[0].Key, getResp.Kvs[0].Value)
删除ETCD里面的数据
kv = clientv3.NewKV(client) _, err = kv.Put(context.TODO(), "/school/class/students", "helios1") if delResp, err = kv.Delete(context.TODO(), "/school/class/students", clientv3.WithPrevKV()); err != nil { fmt.Println(err) return } if len(delResp.PrevKvs) != 0 { for _, kvpair = range delResp.PrevKvs { fmt.Printf("delete key is: %s \n Value: %s \n", string(kvpair.Key), string(kvpair.Value)) } }
申请一个10s的租约,定时查看是否过期
// 申请一个租约 lease = clientv3.NewLease(client) if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil { fmt.Println(err) return } leaseId = leaseGrantResp.ID // 获得kv API子集 kv = clientv3.NewKV(client) if _, err = kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId)); err != nil { fmt.Println(err) return } for { if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil { fmt.Println(err) return } if getResp.Count == 0 { fmt.Println("kv过期了") break } fmt.Println("还没过期:", getResp.Kvs) time.Sleep(2 * time.Second) }
自动续租
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil { fmt.Println(err) return } go func() { for { select { case keepResp = <- keepRespChan: if keepRespChan == nil { fmt.Println("租约已经失效了") goto END } else { // 每秒会续租一次, 所以就会受到一次应答 fmt.Println("收到自动续租应答:", keepResp.ID) } } } END: }()
对某个key进行监听5s
kv = clientv3.NewKV(client) // 模拟KV的变化 go func() { for { _ , err = kv.Put(context.TODO(), "/school/class/students", "helios1") _, err = kv.Delete(context.TODO(), "/school/class/students") time.Sleep(1 * time.Second) } }() // 先GET到当前的值,并监听后续变化 if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil { fmt.Println(err) return } // 现在key是存在的 if len(getResp.Kvs) != 0 { fmt.Println("当前值:", string(getResp.Kvs[0].Value)) } // 获得当前revision watchStartRevision = getResp.Header.Revision + 1 // 创建一个watcher watcher = clientv3.NewWatcher(client) fmt.Println("从该版本向后监听:", watchStartRevision) ctx, cancelFunc := context.WithCancel(context.TODO()) time.AfterFunc(5 * time.Second, func() { cancelFunc() }) watchRespChan = watcher.Watch(ctx, "/school/class/students", clientv3.WithRev(watchStartRevision)) // 处理kv变化事件 for watchResp = range watchRespChan { for _, event = range watchResp.Events { switch event.Type { case mvccpb.PUT: fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision) case mvccpb.DELETE: fmt.Println("删除了", "Revision:", event.Kv.ModRevision) } } }
通过op方法代替kv.Get/kv.Put/kv.Delete
putOp = clientv3.OpPut("/school/class/students", "helios") if opResp, err = kv.Do(context.TODO(), putOp); err != nil { panic(err) } fmt.Println("写入Revision:", opResp.Put().Header.Revision) getOp = clientv3.OpGet("/school/class/students") if opResp, err = kv.Do(context.TODO(), getOp); err != nil { panic(err) } fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision) fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
通过txn实现分布式锁
ETCD中的txn通过简单的"If-Then-Else"实现了原子操作。
实现分布式锁主要分为三个步骤:
1. 上锁,包括创建租约、自动续约、在租约时间内去抢一个key
2. 抢到锁后执行业务逻辑,没有抢到退出
3. 释放租约
// 1. 上锁 // 1.1 创建租约 lease = clientv3.NewLease(client) if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil { panic(err) } leaseId = leaseGrantResp.ID // 1.2 自动续约 // 创建一个可取消的租约,主要是为了退出的时候能够释放 ctx, cancelFunc = context.WithCancel(context.TODO()) // 3. 释放租约 defer cancelFunc() defer lease.Revoke(context.TODO(), leaseId) if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil { panic(err) } // 续约应答 go func() { for { select { case keepResp = <- keepRespChan: if keepRespChan == nil { fmt.Println("租约已经失效了") goto END } else { // 每秒会续租一次, 所以就会受到一次应答 fmt.Println("收到自动续租应答:", keepResp.ID) } } } END: }() // 1.3 在租约时间内去抢锁(etcd里面的锁就是一个key) kv = clientv3.NewKV(client) // 创建事物 txn = kv.Txn(context.TODO()) //if 不存在key, then 设置它, else 抢锁失败 txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)). Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))). Else(clientv3.OpGet("lock")) // 提交事务 if txnResp, err = txn.Commit(); err != nil { panic(err) } if !txnResp.Succeeded { fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value)) return } // 2. 抢到锁后执行业务逻辑,没有抢到退出 fmt.Println("处理任务") time.Sleep(5 * time.Second) // 3. 释放锁,步骤在上面的defer,当defer租约关掉的时候,对应的key被回收了
参考链接:https://zhuanlan.zhihu.com/p/111800017