[go]etcd使用


// 连接etcd
import (
	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/mvcc/mvccpb"
)
config = clientv3.Config{
	Endpoints: []string{"127.0.0.1:2379"},
	DialTimeout: 5 * time.Second,
}

client, err = clientv3.New(config)
kv = clientv3.NewKV(client)
// put
putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "bye", clientv3.WithPrevKV())
putResp.Header.Revision
putResp.PrevKv.Value
// get
getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1", /*clientv3.WithCountOnly()*/)
getResp.Kvs
getResp.Count

// get prefix
getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix())
getResp.Kvs

// delete
delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey(), clientv3.WithLimit(2))
for _, kvpair = range delResp.PrevKvs {
	fmt.Println("删除了:", string(kvpair.Key), string(kvpair.Value))
}
// lease: 关注key是否存在

lease = clientv3.NewLease(client)
leaseGrantResp, err = lease.Grant(context.TODO(), 10)
keepRespChan, err = lease.KeepAlive(context.TODO(), leaseGrantResp.id) //自动续约

putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseGrantResp.id))
putResp.Header.Revision
// watch:  关注key的变化

getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7") //先get到当前值,监听后续变化
watchStartRevision = getResp.Header.Revision + 1
watcher = clientv3.NewWatcher(client)

ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5 * time.Second, func() {
	cancelFunc()
})

watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))

for watchResp = range watchRespChan {
	for _, event = range watchResp.Events {
		switch event.Type {
		case mvccpb.PUT:
			fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
		case mvccpb.DELETE:
			fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
		}
	}
}
// op操作(类似条件一样的)
putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")
opResp, err = kv.Do(context.TODO(), putOp)

// kv.Put
// kv.Get
// kv.Delete

opResp.Put().Header.Revision

getOp = clientv3.OpGet("/cron/jobs/job8")
opResp, err = kv.Do(context.TODO(), getOp)
opResp.Get().Kvs[0].ModRevision
opResp.Get().Kvs[0].Value
// 事务机制: 去锁资源, 判断lock目录key是否存在, 如果不存在则put进去, 标记抢锁成功, 如果存在则象征性get下, 标记强锁失败
lease = clientv3.NewLease(client) //创建租约, 自动续租, 拿着租约去抢占一个key
leaseGrantResp, err = lease.Grant(context.TODO(), 5)
defer lease.Revoke(context.TODO(), leaseGrantResp.id)
defer lease.Revoke(context.TODO(), leaseGrantResp.id)
keepRespChan, err = lease.KeepAlive(ctx, leaseId) //5秒后会取消自动续租


go func() {
	for {
		select {
		case keepResp = <- keepRespChan:
			if keepRespChan == nil {
				fmt.Println("租约已经失效了")
				goto END
			} else {	// 每秒会续租一次, 所以就会受到一次应答
				fmt.Println("收到自动续租应答:", keepResp.ID)
			}
		}
	}
END:
}()

kv = clientv3.NewKV(client)  //  if 不存在key, then 设置它, else 抢锁失败
txn = kv.Txn(context.TODO())  // 创建事务

//if条件成立,走then,否则走else
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
	Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
	Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
txn.Commit()
txnResp.Succeeded //抢锁成功

理解事务: 一个curl的例子

// python2
// 已在etcd中创建了key1:0,A:"success",B:"failure"三个键值对
import json
import base64
import requests

URL = "http://127.0.0.1:2379/v3beta/kv/%s"

url = URL % "txn"

payload = {
    "compare":[
        {
            "target": "VALUE",
            "key":base64.b64encode("key1"),
            'result': "EQUAL",
            "value": base64.b64encode("0"),
        },
    ],
    "success":[
        {
            "requestRange":{
                "key":base64.b64encode("A"),
            }
        }
    ],
    "failure":[
        {
            "requestRange":{
                "key":base64.b64encode("B"),
            }
        }
    ]
}
resp = requests.post(url,json=payload)
print json.dumps(resp.json(), indent=2)

所谓的支持事务锁: 里面的操作是原子的

      // 发起转账视图
      txn := etcd.Txn(ctx.TODO()).If(
          v3.Compare(v3.ModRevision(from), “=”, fromKV.ModRevision),  // 事务提交时,from账户余额没有没有变动
          v3.Compare(v3.ModRevision(to), “=”, toKV.ModRevision))      // 事务提交时,to账户余额没有变动
      txn = txn.Then(
          OpPut(from, fromUint64(fromV - amount)),  // 更新from账户余额
          OpPut(to, fromUint64(toV - amount))       // 更新to账户余额
      putresp, err := txn.Commit()   // 提交事务

[etcd]分布式锁

etcd election特性使用场景——Master选举分析与实现

原文地址:https://www.cnblogs.com/iiiiiher/p/11961054.html