Golang etcd ClientV3 的使用

Jackey Golang 123 次浏览 , 没有评论

etcdctl基本使用

  1. 增加一条数据
    etcdctl put "/school/class/name" "helios"
  2. 获取一条数据
    etcdctl get "/school/class/name"
  3. 得到一组数据
    etcdctl get "/school/class/" --prefix
  4. 得到所有的key
    etcdctl --keys-only=true get "" --from-key
  5. 得到所有的key与value
    etcdctl get "" --from-key
    ETCDCTL_API=3 etcdctl get "" --from-key
  6. 删除一条数据
    etcdctl del "/school/class/name2"
  7. 关于集群的操作
    指定endpoints: export ENDPOINTS="172.27.143.50:2379"
    查看集群状态:etcdctl --write-out=table --endpoints=$ENDPOINTS endpoint status
    查看集群成员:etcdctl --write-out=table --endpoints=$ENDPOINTS member list
    删除集群成员:
    MEMBER_ID=8e9e05c52164694d
    etcdctl --endpoints=$ENDPOINTS member remove ${MEMBER_ID}
  8. 碎片整理
    etcdctl --endpoints=$ENDPOINTS defrag
  9. 备份当前ETCD集群
    etcdctl snapshot save snapshot.db
  10. 查看snapshot状态
    etcdctl snapshot status snapshot.db
  11. 从备份中恢复集群
    etcdctl snapshot save snapshot.db
  12. 切换leader
    # 先看状态
    etcdctl endpoint --cluster=true status -w table
    # move-leader
    ./etcdctl move-leader d6414a7c7c550d29

连接ETCD

var (
    config clientv3.Config
    client *clientv3.Client
    err error
)
// 客户端配置
config = clientv3.Config{
    Endpoints: []string{"172.27.43.50:2379"},
    DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
    fmt.Println(err)
    return
}

写入数据到ETCD

// 实例化一个用于操作ETCD的KV
    kv = clientv3.NewKV(client)
    if putResp, err = kv.Put(context.TODO(), "/school/class/students", "helios0", clientv3.WithPrevKV()); err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(putResp.Header.Revision)
    if putResp.PrevKv != nil {
        fmt.Printf("prev Value: %s \n CreateRevision : %d \n ModRevision: %d \n Version: %d \n",
            string(putResp.PrevKv.Value), putResp.PrevKv.CreateRevision, putResp.PrevKv.ModRevision, putResp.PrevKv.Version)
    }

获取ETCD里面的数据

// 实例化一个用于操作ETCD的KV
kv = clientv3.NewKV(client)
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
    fmt.Println(err)
    return
}
// 输出本次的Revision
fmt.Printf("Key is s %s \n Value is %s \n", getResp.Kvs[0].Key, getResp.Kvs[0].Value)

删除ETCD里面的数据

kv = clientv3.NewKV(client)
_, err = kv.Put(context.TODO(), "/school/class/students", "helios1")

if delResp, err = kv.Delete(context.TODO(), "/school/class/students", clientv3.WithPrevKV()); err != nil {
    fmt.Println(err)
    return
}
if len(delResp.PrevKvs) != 0 {
    for _, kvpair = range delResp.PrevKvs {
        fmt.Printf("delete key is: %s \n Value: %s \n", string(kvpair.Key), string(kvpair.Value))
    }
}

申请一个10s的租约,定时查看是否过期

// 申请一个租约
lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
    fmt.Println(err)
    return
}
leaseId = leaseGrantResp.ID

// 获得kv API子集
kv = clientv3.NewKV(client)

if _, err = kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId)); err != nil {
    fmt.Println(err)
    return
}

for {
    if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
        fmt.Println(err)
        return
    }
    if getResp.Count == 0 {
        fmt.Println("kv过期了")
        break
    }
    fmt.Println("还没过期:", getResp.Kvs)
    time.Sleep(2 * time.Second)
}

自动续租

if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
    fmt.Println(err)
    return
}
go func() {
    for {
        select {
        case keepResp = <- keepRespChan:
            if keepRespChan == nil {
                fmt.Println("租约已经失效了")
                goto END
            } else {    // 每秒会续租一次, 所以就会受到一次应答
                fmt.Println("收到自动续租应答:", keepResp.ID)
            }
        }
    }
END:
}()

对某个key进行监听5s

kv = clientv3.NewKV(client)

// 模拟KV的变化
go func() {
    for {
        _ , err = kv.Put(context.TODO(), "/school/class/students", "helios1")
        _, err = kv.Delete(context.TODO(), "/school/class/students")
        time.Sleep(1 * time.Second)
    }
}()

// 先GET到当前的值,并监听后续变化
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
    fmt.Println(err)
    return
}

// 现在key是存在的
if len(getResp.Kvs) != 0 {
    fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}

// 获得当前revision
watchStartRevision = getResp.Header.Revision + 1
// 创建一个watcher
watcher = clientv3.NewWatcher(client)
fmt.Println("从该版本向后监听:", watchStartRevision)

ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5 * time.Second, func() {
    cancelFunc()
})

watchRespChan = watcher.Watch(ctx, "/school/class/students", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp = range watchRespChan {
    for _, event = range watchResp.Events {
        switch event.Type {
        case mvccpb.PUT:
            fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
        case mvccpb.DELETE:
            fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
        }
    }
}

通过op方法代替kv.Get/kv.Put/kv.Delete

putOp = clientv3.OpPut("/school/class/students", "helios")

if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
    panic(err)
}
fmt.Println("写入Revision:", opResp.Put().Header.Revision)

getOp = clientv3.OpGet("/school/class/students")

if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
    panic(err)
}
fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))

通过txn实现分布式锁

ETCD中的txn通过简单的"If-Then-Else"实现了原子操作。

实现分布式锁主要分为三个步骤:

1. 上锁,包括创建租约、自动续约、在租约时间内去抢一个key

2. 抢到锁后执行业务逻辑,没有抢到退出

3. 释放租约

 

// 1. 上锁
// 1.1 创建租约
lease = clientv3.NewLease(client)

if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
    panic(err)
}
leaseId = leaseGrantResp.ID

// 1.2 自动续约
// 创建一个可取消的租约,主要是为了退出的时候能够释放
ctx, cancelFunc = context.WithCancel(context.TODO())

// 3. 释放租约
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)

if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
    panic(err)
}
// 续约应答
go func() {
    for {
        select {
        case keepResp = <- keepRespChan:
            if keepRespChan == nil {
                fmt.Println("租约已经失效了")
                goto END
            } else {    // 每秒会续租一次, 所以就会受到一次应答
                fmt.Println("收到自动续租应答:", keepResp.ID)
            }
        }
    }
END:
}()

// 1.3 在租约时间内去抢锁(etcd里面的锁就是一个key)
kv = clientv3.NewKV(client)

// 创建事物
txn = kv.Txn(context.TODO())

//if 不存在key, then 设置它, else 抢锁失败
txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
    Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))).
    Else(clientv3.OpGet("lock"))

// 提交事务
if txnResp, err = txn.Commit(); err != nil {
    panic(err)
}

if !txnResp.Succeeded {
    fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
    return
}

// 2. 抢到锁后执行业务逻辑,没有抢到退出
fmt.Println("处理任务")
time.Sleep(5 * time.Second)

// 3. 释放锁,步骤在上面的defer,当defer租约关掉的时候,对应的key被回收了

参考链接:https://zhuanlan.zhihu.com/p/111800017

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

Go