etcdctl基本使用
- 增加一条数据
etcdctl put "/school/class/name" "helios" - 获取一条数据
etcdctl get "/school/class/name" - 得到一组数据
etcdctl get "/school/class/" --prefix - 得到所有的key
etcdctl --keys-only=true get "" --from-key - 得到所有的key与value
etcdctl get "" --from-key
ETCDCTL_API=3 etcdctl get "" --from-key - 删除一条数据
etcdctl del "/school/class/name2" - 关于集群的操作
指定endpoints: export ENDPOINTS="172.27.143.50:2379"
查看集群状态:etcdctl --write-out=table --endpoints=$ENDPOINTS endpoint status
查看集群成员:etcdctl --write-out=table --endpoints=$ENDPOINTS member list
删除集群成员:
MEMBER_ID=8e9e05c52164694d
etcdctl --endpoints=$ENDPOINTS member remove ${MEMBER_ID} - 碎片整理
etcdctl --endpoints=$ENDPOINTS defrag - 备份当前ETCD集群
etcdctl snapshot save snapshot.db - 查看snapshot状态
etcdctl snapshot status snapshot.db - 从备份中恢复集群
etcdctl snapshot save snapshot.db - 切换leader
# 先看状态
etcdctl endpoint --cluster=true status -w table
# move-leader
./etcdctl move-leader d6414a7c7c550d29
连接ETCD
var (
config clientv3.Config
client *clientv3.Client
err error
)
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"172.27.43.50:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
写入数据到ETCD
// 实例化一个用于操作ETCD的KV
kv = clientv3.NewKV(client)
if putResp, err = kv.Put(context.TODO(), "/school/class/students", "helios0", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
fmt.Println(putResp.Header.Revision)
if putResp.PrevKv != nil {
fmt.Printf("prev Value: %s \n CreateRevision : %d \n ModRevision: %d \n Version: %d \n",
string(putResp.PrevKv.Value), putResp.PrevKv.CreateRevision, putResp.PrevKv.ModRevision, putResp.PrevKv.Version)
}
获取ETCD里面的数据
// 实例化一个用于操作ETCD的KV
kv = clientv3.NewKV(client)
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
fmt.Println(err)
return
}
// 输出本次的Revision
fmt.Printf("Key is s %s \n Value is %s \n", getResp.Kvs[0].Key, getResp.Kvs[0].Value)
删除ETCD里面的数据
kv = clientv3.NewKV(client)
_, err = kv.Put(context.TODO(), "/school/class/students", "helios1")
if delResp, err = kv.Delete(context.TODO(), "/school/class/students", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
if len(delResp.PrevKvs) != 0 {
for _, kvpair = range delResp.PrevKvs {
fmt.Printf("delete key is: %s \n Value: %s \n", string(kvpair.Key), string(kvpair.Value))
}
}
申请一个10s的租约,定时查看是否过期
// 申请一个租约
lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println(err)
return
}
leaseId = leaseGrantResp.ID
// 获得kv API子集
kv = clientv3.NewKV(client)
if _, err = kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}
for {
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv过期了")
break
}
fmt.Println("还没过期:", getResp.Kvs)
time.Sleep(2 * time.Second)
}
自动续租
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
fmt.Println(err)
return
}
go func() {
for {
select {
case keepResp = <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
对某个key进行监听5s
kv = clientv3.NewKV(client)
// 模拟KV的变化
go func() {
for {
_ , err = kv.Put(context.TODO(), "/school/class/students", "helios1")
_, err = kv.Delete(context.TODO(), "/school/class/students")
time.Sleep(1 * time.Second)
}
}()
// 先GET到当前的值,并监听后续变化
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
fmt.Println(err)
return
}
// 现在key是存在的
if len(getResp.Kvs) != 0 {
fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}
// 获得当前revision
watchStartRevision = getResp.Header.Revision + 1
// 创建一个watcher
watcher = clientv3.NewWatcher(client)
fmt.Println("从该版本向后监听:", watchStartRevision)
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5 * time.Second, func() {
cancelFunc()
})
watchRespChan = watcher.Watch(ctx, "/school/class/students", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp = range watchRespChan {
for _, event = range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
}
}
}
通过op方法代替kv.Get/kv.Put/kv.Delete
putOp = clientv3.OpPut("/school/class/students", "helios")
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
panic(err)
}
fmt.Println("写入Revision:", opResp.Put().Header.Revision)
getOp = clientv3.OpGet("/school/class/students")
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
panic(err)
}
fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
通过txn实现分布式锁
ETCD中的txn通过简单的"If-Then-Else"实现了原子操作。
实现分布式锁主要分为三个步骤:
1. 上锁,包括创建租约、自动续约、在租约时间内去抢一个key
2. 抢到锁后执行业务逻辑,没有抢到退出
3. 释放租约
// 1. 上锁
// 1.1 创建租约
lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
panic(err)
}
leaseId = leaseGrantResp.ID
// 1.2 自动续约
// 创建一个可取消的租约,主要是为了退出的时候能够释放
ctx, cancelFunc = context.WithCancel(context.TODO())
// 3. 释放租约
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
panic(err)
}
// 续约应答
go func() {
for {
select {
case keepResp = <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
// 1.3 在租约时间内去抢锁(etcd里面的锁就是一个key)
kv = clientv3.NewKV(client)
// 创建事物
txn = kv.Txn(context.TODO())
//if 不存在key, then 设置它, else 抢锁失败
txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("lock"))
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
panic(err)
}
if !txnResp.Succeeded {
fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2. 抢到锁后执行业务逻辑,没有抢到退出
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 3. 释放锁,步骤在上面的defer,当defer租约关掉的时候,对应的key被回收了
参考链接:https://zhuanlan.zhihu.com/p/111800017