Go语言学习之12 etcd、contex、kafka消费实例、logagent

本节内容:
    1. etcd介绍与使用
    2. ElastcSearch介绍与使用

1. etcd介绍与使用
    概念:高可用的分布式key-value存储,可以使用配置共享和服务发现
    类似项目:zookeeper和consul
    开发语言:Go
    接口:提供restful的http接口,使用简单
    实现算法:基于raft算法的强一致性、高可用的服务存储目录

2. etcd的应用场景
    a. 服务发现和服务注册
    b. 配置中心
    c. 分布式存储
    d. master选举

3. etcd搭建
    a. 下载etcd release版本:https://github.com/etcd-io/etcd/releases 版本
    b. 解压后,进入到etcd的根目录,直接执行./etcd 可以启动etcd
    c. 使用etcdctl工具更改配置

4. context使用介绍
    a. 如何控制goroutine
    b. 如何保存上下文数据

 (1)使用context处理超时
    ctx, cancel = context.With.Timeout(context.Background(), 2*time.Second)

    示例是通过设置ctx超时时间为2s,如果2s类无法接收到baidu的请求返回,则超时异常。

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "io/ioutil"
 7     "net/http"
 8     "time"
 9 )
10 type Result struct {
11     r   *http.Response
12     err error
13 }
14 
15 func process() {
16     ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
17     defer cancel()
18     tr := &http.Transport{}
19     client := &http.Client{Transport: tr}
20     c := make(chan Result, 1)
21     req, err := http.NewRequest("GET", "http://www.baidu.com", nil)
22     if err != nil {
23         fmt.Println("http request failed, err:", err)
24         return
25     }
26     go func() {
27         resp, err := client.Do(req)
28         pack := Result{r: resp, err: err}
29         c <- pack
30     }()
31     select {
32     case <-ctx.Done():
33         tr.CancelRequest(req)
34         res := <-c
35         fmt.Println("Timeout! err:", res.err)
36     case res := <-c:
37         defer res.r.Body.Close()
38         out, _ := ioutil.ReadAll(res.r.Body)
39         fmt.Printf("Server Response: %s", out)
40     }
41     return
42 }
43 func main() {
44     process()
45 }
ctx_timeout

(2) 使用context保存上下文
    利用context来保存上下文值:

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6 )
 7 
 8 func process(ctx context.Context) {
 9     ret,ok := ctx.Value("trace_id").(int)
10     if !ok {
11         ret = 21342423
12     }
13 
14     fmt.Printf("ret:%d
", ret)
15 
16     s , _ := ctx.Value("session").(string)
17     fmt.Printf("session:%s
", s)
18 }
19 
20 func main() {
21     ctx := context.WithValue(context.Background(), "trace_id", 13483434)
22     ctx = context.WithValue(ctx, "session", "sdlkfjkaslfsalfsafjalskfj")
23     process(ctx)
24 }
ctx_value

    同时还有context ctx_cancel 和 ctx_deadline

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "time"
 7 )
 8 
 9 func gen(ctx context.Context) <-chan int {
10     dst := make(chan int)
11     n := 1
12     go func() {
13         for {
14             select {
15             case <-ctx.Done():  //当43行的test函数执行结束之后,执行defer cancel(),则会触发该行
16                 fmt.Println("i exited")
17                 return // returning not to leak the goroutine
18             case dst <- n:
19                 n++
20             }
21         }
22     }()
23     return dst
24 }
25 
26 func test() {
27     // gen generates integers in a separate goroutine and
28     // sends them to the returned channel.
29     // The callers of gen need to cancel the context once
30     // they are done consuming generated integers not to leak
31     // the internal goroutine started by gen.
32     ctx, cancel := context.WithCancel(context.Background())
33     defer cancel() // cancel when we are finished consuming integers
34     intChan := gen(ctx)
35     for n := range intChan {
36         fmt.Println(n)
37         if n == 5 {
38             break
39         }
40     }
41 }
42 func main() {
43     test()
44     time.Sleep(time.Hour)
45 }
ctx_cancel
 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "time"
 7 )
 8 
 9 func main() {
10     d := time.Now().Add(50 * time.Millisecond)
11     ctx, cancel := context.WithDeadline(context.Background(), d)
12 
13     // Even though ctx will be expired, it is good practice to call its
14     // cancelation function in any case. Failure to do so may keep the
15     // context and its parent alive longer than necessary.
16     defer cancel()
17 
18     select {
19     case <-time.After(1 * time.Second):
20         fmt.Println("overslept")
21     case <-ctx.Done():
22         fmt.Println(ctx.Err())  //context deadline exceeded
23     }
24 
25 }
ctx_deadline

5. etcd介绍与使用
    etcd使用示例 (由于虚拟机出现问题,后面的程序全在Windows上面操作):

(1)客户端连接 etcd server端

 1 package main
 2 
 3 import (
 4     "fmt"
 5     //etcd_client "github.com/coreos/etcd/clientv3"
 6     etcd_client "go.etcd.io/etcd/clientv3"
 7     "time"
 8 )
 9 
10 func main() {
11 
12     cli, err := etcd_client.New(etcd_client.Config{
13         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
14         DialTimeout: 5 * time.Second,
15     })
16     if err != nil {
17         fmt.Println("connect failed, err:", err)
18         return
19     }
20 
21     fmt.Println("connect succ")
22     defer cli.Close()
23 }
etcd_conn

(2)put 和 get

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "go.etcd.io/etcd/clientv3"
 7     "time"
 8 )
 9 
10 func main() {
11 
12     cli, err := clientv3.New(clientv3.Config{
13         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
14         DialTimeout: 5 * time.Second,
15     })
16     if err != nil {
17         fmt.Println("connect failed, err:", err)
18         return
19     }
20 
21     fmt.Println("connect succ")
22     defer cli.Close()
23     //设置1秒超时,访问etcd有超时控制
24     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
25     //操作etcd
26     _, err = cli.Put(ctx, "/logagent/conf/", "192.168.0.1")
27     //操作完毕,取消etcd
28     cancel()
29     if err != nil {
30         fmt.Println("put failed, err:", err)
31         return
32     }
33     //取值,设置超时为1秒
34     ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
35     resp, err := cli.Get(ctx, "/logagent/conf/")
36     cancel()
37     if err != nil {
38         fmt.Println("get failed, err:", err)
39         return
40     }
41     for _, ev := range resp.Kvs {
42         fmt.Printf("%s : %s
", ev.Key, ev.Value)
43     }
44 }
put_get

(3)watch(观测key值发生变化)

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "time"
 7 
 8     etcd_client "go.etcd.io/etcd/clientv3"
 9 )
10 
11 func main() {
12 
13     cli, err := etcd_client.New(etcd_client.Config{
14         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
15         DialTimeout: 5 * time.Second,
16     })
17     if err != nil {
18         fmt.Println("connect failed, err:", err)
19         return
20     }
21     defer cli.Close()
22 
23     fmt.Println("connect succ")
24     
25     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
26     _, err = cli.Put(ctx, "/logagent/conf/", "99999")
27     if err != nil {
28         fmt.Println("put failed, err:", err)
29         return
30     }
31     cancel()
32     
33     fmt.Println("put succ")
34 
35     for {
36         rch := cli.Watch(context.Background(), "/logagent/conf/")
37         for wresp := range rch {
38             for _, ev := range wresp.Events {
39                 fmt.Printf("%s %q : %q
", ev.Type, ev.Kv.Key, ev.Kv.Value)
40             }
41         }
42     }
43 }
watch

    运行上面的watch程序监控key(/logagent/conf/)操作的变化,然后再运行(2)的程序,结果如下:

    kafka消费示例代码:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "strings"
 6     "sync"
 7 
 8     "github.com/Shopify/sarama"
 9 )
10 
11 var (
12     wg sync.WaitGroup
13 )
14 
15 func main() {
16 
17     consumer, err := sarama.NewConsumer(strings.Split("192.168:30.136:9092", ","), nil)
18     if err != nil {
19         fmt.Println("Failed to start consumer: %s", err)
20         return
21     }
22     partitionList, err := consumer.Partitions("nginx_log")
23     if err != nil {
24         fmt.Println("Failed to get the list of partitions: ", err)
25         return
26     }
27 
28     fmt.Println(partitionList)
29 
30     for partition := range partitionList {
31         pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
32         if err != nil {
33             fmt.Printf("Failed to start consumer for partition %d: %s
", partition, err)
34             return
35         }
36         defer pc.AsyncClose()
37         
38         go func(pc sarama.PartitionConsumer) {
39             wg.Add(1)
40             for msg := range pc.Messages() {
41                 fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
42                 fmt.Println()
43             }
44             wg.Done()
45         }(pc)
46     }
47     //time.Sleep(time.Hour)
48     wg.Wait()
49     consumer.Close()
50 }
kafka消费示例代码

6. sync.WaitGroup介绍
1)等待一组groutine结束
2)使用Add方法设置等待的数量加1
3)使用Delete方法设置等待的数量减1
4)当等待的数量等于0,Wait函数返回

sync.WaitGroup实例:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "sync"
 6     "time"
 7 )
 8 
 9 func main() {
10     wg := sync.WaitGroup{}
11     for i := 0; i < 10; i++ {
12         wg.Add(1)
13         go calc(&wg, i)
14     }
15 
16     wg.Wait() //阻塞,等待所有groutine结束
17     fmt.Println("all goroutine finish")
18 }
19 
20 func calc(w *sync.WaitGroup, i int) {
21     //注意: wg.Add(1) 放到这会有问题,也就是main函数结束比wg.Add(1)要快
22     fmt.Println("calc:", i)
23     time.Sleep(time.Second)
24     w.Done()
25 }
waitGroup示例

7. ElastcSearch安装及go操作es

(1)安装 es
   1)下载ES,下载地址:https://www.elastic.co/products/elasticsearch,我下载的是 elasticsearch-6.7.1.zip。
   2)修改在解压后根目录下的 /config/elasticsearch.yml 配置:

    放开注释并将 youIP换成你自己机器的 ip

cluster.name: my-application
node.name: node-1
network.host: youIP
http.port: 9200

   3)修改 /config/jvm.options 文件,当然如果机器性能好也可以不用修改:

-Xms512m
-Xmx512m

   4)进入根目录,启动es,.inelasticsearch.bat

(2)go 操作 es 示例

    安装第三方插件:

go get gopkg.in/olivere/elastic.v2

    示例:注意将程序里面的 url = "http://yourIP:9200/",yourIP替换为你安装es机器的 ip:

 1 package main
 2 
 3 import (
 4     "fmt"
 5 
 6     elastic "gopkg.in/olivere/elastic.v2"
 7 )
 8 
 9 type Tweet struct {
10     User    string
11     Message string
12 }
13 
14 var (
15     url = "http://yourIP:9200/"
16 )
17 
18 func main() {
19     client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(url))
20     if err != nil {
21         fmt.Println("connect es error", err)
22         return
23     }
24 
25     fmt.Println("conn es succ")
26 
27     tweet := Tweet{User: "olivere", Message: "Take Five"}
28     _, err = client.Index().
29         Index("twitter").
30         Type("tweet").
31         Id("1").
32         BodyJson(tweet).
33         Do()
34     if err != nil {
35         // Handle error
36         panic(err)
37         return
38     }
39 
40     fmt.Println("insert succ")
41 }
es示例

    链式存储:

 1 package main
 2 
 3 import "fmt"
 4 
 5 type Stu struct {
 6     Name string
 7     Age  int
 8 }
 9 
10 func (p *Stu) SetName(name string) *Stu {
11     p.Name = name
12     return p
13 }
14 
15 func (p *Stu) SetAge(age int) *Stu {
16     p.Age = age
17     return p
18 }
19 
20 func (p *Stu) Print() {
21     fmt.Printf("age:%d name:%s
", p.Age, p.Name)
22 }
23 
24 func main() {
25     stu := &Stu{}
26     stu.SetAge(12).SetName("stu01").Print()
27     //stu.SetName("stu01")
28     //stu.Print()
29 }
链式存储示例
原文地址:https://www.cnblogs.com/xuejiale/p/10660857.html