如何分析VCS监控采集的共享内存和实战原理

技术怎么解析共享内存原理与VCS监控采集实战本篇文章给大家分享的是有关怎么解析共享内存原理与VCS监控采集实战,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

本文是关于如何分析共享内存的原理和VCS监控采集的实际情况。边肖觉得挺实用的,所以分享给大家学习。希望你看完这篇文章能有所收获。让我们和边肖一起看看。

00-1010共享内存广泛应用于Redis、Kafka、RabbitMQ等高性能组件。本文主要提供了一个共享内存中广告埋点数据采集的实用场景。

00-1010

一、前言

在Linux中,每个进程都有自己的进程控制块(PCB)和地址空间(Addr Space),每个进程都有对应的页表,页表负责映射进程的虚拟地址和物理地址,并通过内存管理单元(MMU)进行管理。两个不同的虚拟地址通过页表映射到物理空间的相同区域,它们指向的区域是共享内存。

当两个进程通过页表将虚拟地址映射到物理地址时,物理地址中有一个公共的内存区域,即共享内存,两个进程可以同时看到。这样,当一个进程写,另一个进程读时,就可以实现进程间通信。然而,我们想确保一个进程在写入时不能被读取,所以我们使用信号量来实现同步和互斥。

对于共享内存,采用引用计数原则。当进程离开共享内存区时,计数器减1。试管架成功后,计数器增加1。只有计数器变为零,才能删除。当进程终止时,附加到它的共享内存区域将自动分离。

怎么解析共享内存原理与VCS监控采集实战

00-1010共享内存可以说是进程间通信最有用的方式,也是IPC最快的形式,因为进程可以直接读写内存,而无需复制任何数据。对于流水线、消息队列等通信方式,需要在内核和用户空间复制数据四次,而共享内存只复制数据:两次,一次从输入文件复制到共享内存区域,一次从共享内存区域复制到输出文件。

事实上,当进程存储在共享内存中时,它们并不总是在读写少量数据后取消映射,然后在有新的通信时重新建立共享内存区域。取而代之的是,共享区一直保留到通信结束,这样数据内容就一直保留在共享内存中,文件不会被写回。未映射时,共享内存的内容通常会写回文件。因此,使用共享内存进行通信的效率非常高。

传统文件

UNIX访问文件的传统方法是用open打开文件。如果多个进程访问同一个文件,每个进程在其自己的地址空间中都包含该文件的副本,这就不必要地浪费了存储空间。

下图说明了两个进程同时读取文件同一页的情况。为了将页面从磁盘读取到缓存,每个进程在内存中执行另一个复制操作,将数据从缓存读取到自己的地址空间。

怎么解析共享内存原理与VCS监控采集实战

共享存储映射

现在考虑另一种处理方法:进程A和进程B都将页面映射到自己的地址空间,当进程A第一次访问页面中的数据时,会产生缺页中断。此时,内核将这个页面读入内存,并更新页面表以指向它。之后,当进程B访问同一个页面,出现缺页中断时,该页面已经在内存中,内核只需要将进程B的页表条目指向次页即可。

怎么解析共享内存原理与VCS监控采集实战

二、共享内存原理

(1)mmap()系统调用

mmap()系统调用使进程能够通过映射相同的普通文件来共享内存。普通文件映射到进程地址空间后,进程可以像普通内存一样访问文件,无需调用read()、write()等操作。

mmap()系统调用形式如下:

Void * mmap (void * addr,size _ tlen,intprot,intflags,intfd,off _ toffset) mmap用于将文件描述符fd指定的文件的[off,off len]区域映射到调用进程的[addr,addr len]的内存区域:

怎么解析共享内存原理与VCS监控采集实战

数字fd即将

映射到进程空间的文件描述字,一般由open()返回,同时,fd可以指定为-1,此时须指定flags参数中的,MAP_ANON,表明进行的是匿名映射(不涉及具体的文件名,避免了文件的创建及打开,很显然只能用于具有亲缘关系的进程间通信)。

  • len是映射到调用进程地址空间的字节数,它从被映射文件开头offset个字节开始算起。

  • prot 参数指定共享内存的访问权限。可取如下几个值的或:PROT_READ(可读) , PROT_WRITE (可写), PROT_EXEC (可执行), PROT_NONE(不可访问)。

  • flags由以下几个常值指定:MAP_SHARED , MAP_PRIVATE , MAP_FIXED,其中,MAP_SHARED , MAP_PRIVATE必选其一,而MAP_FIXED则不推荐使用。

  • offset参数一般设为0,表示从文件头开始映射。

  • 参数addr指定文件应被映射到进程空间的起始地址,一般被指定一个空指针,此时选择起始地址的任务留给内核来完成。函数的返回值为最后文件映射到进程空间的地址,进程可直接操作起始地址为该值的有效地址。

  • (2)mmap()返回地址的访问

    对mmap()返回地址的访问,linux采用的是页式管理机制。

    对于用mmap()映射普通文件来说,进程会在自己的地址空间新增一块空间,空间大小由mmap()的len参数指定,注意,进程并不一定能够对全部新增空间都能进行有效访问。

    进程能够访问的有效地址大小取决于文件被映射部分的大小。

    简单的说,能够容纳文件被映射部分大小的最少页面个数决定了进程从mmap()返回的地址开始,能够有效访问的地址空间大小。

    超过这个空间大小,内核会根据超过的严重程度返回发送不同的信号给进程。可用如下图示说明:

    怎么解析共享内存原理与VCS监控采集实战

    2、分区读写

     为了要确保一个进程在写的时候不能被读,我们使用idx来标记可读块。

    怎么解析共享内存原理与VCS监控采集实战

    3、规则,指标和值

    下图描述的是从连续内存空间转化成【规则,维度,值】语义的过程:

    怎么解析共享内存原理与VCS监控采集实战

    4、源码分析

    怎么解析共享内存原理与VCS监控采集实战

    5、general.proto 

    通用监控上报协议:

    general.proto
     
    syntax = "proto2";
    package general;
    message Data {
        map<string, string> kv = 1;
    }
    message GeneralData {
        optional string rule_id = 1;
        repeated Data data = 2;
        optional int64 count = 3;
        optional int64 left_size = 4;
        optional int32 version = 5;
    }

    6、constant.go 配置参数

    | 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte预留长度 | magincNum2(4byte) | 4k protect |

     

    package moni_shm
     
    const (
       OssShmId           uint32 = 0x3eeff00
       MagicNum1          uint32 = 0x650a218
       MagicNum2          uint32 = 0x138a4f2
       CreateShmLock             = "/var/run/.oss_shm_lock"
       OssMapOneAttrCnt          = 1024 * 128      //1024 个规则
       OssOneAttrEntryCnt        = 128             //每个规则有128个指标
       EntrySz                   = 4
       OssMapCnt                 = 2
     
       OneAttrSz = OssOneAttrEntryCnt * EntrySz
       OssMapSz  = OssMapOneAttrCnt * OneAttrSz
       OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4
     
       defaultIntervalSec = 60
       defaultTopic       = "moni_general_shared_memory"
    )

    7、util.go 工具类

    内存清零工具和"整页"分配:

    cd package moni_shm
    import (
        "unsafe"
    )
    //取整分配
    func align(actual, to uint64) uint64 {
        return (actual + to - 1) / to * to
    }
    //连续空间清0
    func zero(ptr uintptr, bts uint64) {
        if 0 == bts {
            return
        }
        const sz = 4096
        var next uint64
        cnt := 0
        for ; next+sz <= bts; {  //按页清零
            arr := (*[sz]byte)(unsafe.Pointer(ptr))
            for i := range *arr {
                (*arr)[i] = 0
            }
            next += sz
            ptr += uintptr(sz)
            cnt++
        }
        if next == bts {
            return
        }
        var i uintptr
        for i = 0; i < uintptr(bts-next); i++ { //剩余空间清零
            *(*byte)(unsafe.Pointer(ptr + i)) = 0
        }
    }

    8、mgr.go 采集逻辑

    共享内存采集逻辑对应 “规则指标和值”:

    var (
        _basePtr     uintptr = 0
        _shmUtil             = NewShmUtil(OssShmId, OssAttrSz)
        _intervalSec         = defaultIntervalSec
        _topic               = defaultTopic
        _on          bool    = false
    )
    func Stat(on bool) {
        _on = on
    }
    func Start() {
        go collect() //开始采集
    }
    func tryInitBaseptr() error {
        var err error
        if _basePtr == 0 {
            _basePtr, err = _shmUtil.GetData() //获取当前共享内存数据块首地址
            if nil != err {
                logrus.Warnf("init base ptr failed, retrying: %v", err)
            }
        }
        return err
    }
    func collect() {
        var (
            cost  time.Duration
            start time.Time
            first = true
        )
        for {
            if !first {
                time.Sleep(time.Second*(time.Duration(_intervalSec)) - cost) //周期对齐
            }
            first = false
            start = time.Now()
            if !_on {
                cost = time.Since(start)
                continue
            }
            if _basePtr == 0 {
                if err := tryInitBaseptr(); nil != err {
                    cost = time.Since(start)
                    continue
                }
            }
            d := collectOnce()
            for _, v := range d {
                moni_report.ProductReportData(*v)
            }
            cost = time.Since(start)
        }
    }
     
    func collectOnce() []*moni_report.ReportData {
       now := time.Now()
       var ret []*moni_report.ReportData
       data := make(map[uint32]*general.GeneralData)
     
       d := SwitchAndFetch(_basePtr)
       logrus.Infof("sending %d data from shm", len(d))
     
       for _, v := range d {
          ruleId := strconv.FormatUint(uint64(v[0]), 10)
          dim := strconv.FormatUint(uint64(v[1]), 10)
          value := strconv.FormatUint(uint64(v[2]), 10)
     
          if _, ok := data[v[0]]; !ok {
             data[v[0]] = &general.GeneralData{
                RuleId: proto.String(ruleId),
                Data:   []*general.Data{},
             }
          }
     
          data[v[0]].Data = append(data[v[0]].Data, &general.Data{
             Kv: map[string]string{
                dim:         value,
                "timestamp": strconv.FormatInt(now.Unix()*1000, 10),
                "ip":        viper.GetString("host.inner_ip"),
             },
          })
       }
       logrus.Infof("collect format shm data:%v", data)
       for _, v := range data {
          bts, err := proto.Marshal(v)
          if nil != err {
             logrus.Errorf("marshal shm data failed: %v", err)
             continue
          }
          ret = append(ret, &moni_report.ReportData{
             DataBytes: bts,
             Topic:     _topic,
          })
       }
     
       return ret
    }

    9、shmutil.go 共享内存操作

    每60秒根据idx值切换可读区,采集后上报后,清零,切换到下一区。

    package moni_shm
    import (
        "fmt"
        "log"
        "os"
        "syscall"
        "unsafe"
        "github.com/sirupsen/logrus"
    )
    const (
        IpcCreate = 00001000
    )
    var (
        ErrNotCreated   = fmt.Errorf("shm not created")
        ErrCreateFailed = fmt.Errorf("shm create failed")
    )
    type shmOpt func(*ShmUtil)
    func WithCreate(b bool) shmOpt {
        return func(u *ShmUtil) {
            u.create = b
        }
    }
    /*共享内存数据结构
         |1page mprotect|page align data|1page mprotect|
         | 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte预留长度 | magincNum2(4byte) | 4k protect |
    */
    type ShmUtil struct {
        pageSz int
        dataSz uint64
        total  uint64
        shmKey uint32
        create bool
        base uintptr
        data uintptr
    }
    func NewShmUtil(key uint32, sz uint64, cfgs ...shmOpt) *ShmUtil {
        if key == 0 {
            panic("invalid shm key: 0")
        }
        ret := &ShmUtil{
            dataSz: sz,
            shmKey: key,
        }
        ret.pageSz = os.Getpagesize() //获取页大小
        ret.dataSz = align(ret.dataSz, uint64(ret.pageSz)) //按页分配“包体”大小
        ret.total = ret.dataSz + uint64(ret.pageSz)*2     // 总空间大小=包体大小 + 头尾各2页保护地址
        for _, c := range cfgs {
            c(ret)
        }
        return ret
    }
    func (s *ShmUtil) attachShm(flag int) error {
        created := false
        shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag))     //使用已存在的共享内存,返回共享内存标识符
        if 0 != errno {
            return errno
        }
        if shmid < 0 {
            if !s.create {  //不允创建,直接返回
                return ErrNotCreated
            }
            shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate))  //新创建共享内存
            if 0 != errno {
                return fmt.Errorf("shm create: %v", errno)
            }
            if shmid < 0 {
                return ErrCreateFailed
            }
            created = true
        }
        addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0)  //挂接共享内存到当前进程
        if 0 != errno {
            return fmt.Errorf("shmat: %v", errno)
        }
        if created {
            zero(addr, s.total)//新创建的共享内存,初始化共享内存数据
        }
        s.base = addr //记录共享内存首地址 用于之后的释放
        s.data = s.base + uintptr(s.pageSz) //写数据的起始地址
         
        _, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0)
        if 0 != errno { //锁定共享内存头,锁指定的内存区间必须包含整个内存页(4K)
            s.detach()
            return fmt.Errorf("mprotect head: %v", errno)
        }
        _, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0) //锁指定共享内存尾,区间开始的地址start必须是一个内存页的起始地址,并且区间长度len必须是页大小的整数倍。
        if 0 != errno {
            s.detach()
            return fmt.Errorf("mprotect tail: %v", errno)
        }
        return nil
    }
    func (s *ShmUtil) detach() { //进程去关联共享内存
        if 0 != s.base {
            syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0)
            s.base = 0
            s.data = 0
        }
    }
    /*
      获取内存并且返回数据段起始位置
      s.create 决定是否新申请共享内存
    */
    func (s *ShmUtil) GetData() (uintptr, error) {
        if s.data != 0 {
            return s.data, nil
        }
        if err := s.attachShm(0666); nil != err { //初始化共享内存,并关联到进程
            return 0, err
        }
        return s.data, nil
    }
    func SwitchAndFetch(ptr uintptr) [][3]uint32 { //从共享内存读取 [][3]uint32{ossid,key,value}
        if ptr == 0 {
            return nil
        }
        m1 := (*uint32)(unsafe.Pointer(ptr))
        m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64))   
        if MagicNum1 != *m1 || MagicNum2 != *m2 {
            logrus.Errorf("magic 1 in header: wrote:%v\tread:%v\n", MagicNum1, *m1)
            logrus.Errorf("magic 2 in tail:   wrote:%v\tread:%v\n", MagicNum2, *m2)
            return nil
        }
        idx := (*uint32)(unsafe.Pointer(ptr + 4)) //切换块标志
        old := *idx
        *idx = 1 - *idx
        ret := PartialRead(ptr, old)  //读取当前idx块数据
        zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz) //读完清0
        return ret
    }
    //根据idx轮流读数据区域
    func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { //根据idx获取块起始地址
        startPtr := ptr + 8 + uintptr(idx)*OssMapSz
        ret := ReadOssMap(startPtr)
        log.Printf("result: %v\n", ret)
        return ret
    }
    func ReadOssMap(ptr uintptr) [][3]uint32 { //1个周期内的指标总容量为 128*1024 = 128k  = 13W
        var ret [][3]uint32
        var i uint32 = 0
        for i = 0; i < OssMapOneAttrCnt; i++ {  //1个周期最多支持1024个业务
            for _, v := range ReadOneAttr(ptr) {
                ret = append(ret, [3]uint32{i, v[0], v[1]}) // [osID,keyID,value]
            }
            ptr += OneAttrSz  // OneAttrSz = OssOneAttrEntryCnt * EntrySz= 128*4
        }
        return ret
    }
    func ReadOneAttr(ptr uintptr) [][2]uint32 {
        var ret [][2]uint32
        var i uint32 = 0
        for i = 0; i < OssOneAttrEntryCnt; i++ { //目前默认一个业务下最多有128单维度指标, OssOneAttrEntryCnt = 128
            v := *(*uint32)(unsafe.Pointer(ptr))
            if v != 0 {
                ret = append(ret, [2]uint32{i, v}) // [keyID, value]
            }
            ptr += EntrySz  // 4yte 读取一个指标
        }
        return ret
    }

    以上就是怎么解析共享内存原理与VCS监控采集实战,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

    内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/148967.html

    (0)

    相关推荐

    • 互联网中元宇宙概念指的是什么意思

      技术互联网中元宇宙概念指的是什么意思这篇文章将为大家详细讲解有关互联网中元宇宙概念指的是什么意思,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。 元宇宙不是特

      2021年11月2日
    • h2so3,初三必背的化学式有那些

      技术h2so3,初三必背的化学式有那些初中化学知识点摘要 一、基本概念h2so3:
      1、 化学变化:无新物质生成的变化。如:蒸发、挥发、溶解、潮解等。
      物理变化:有新物质生成的变化。如:燃烧、生锈、腐败、

      生活 2021年10月28日
    • QGIS如何连接Arcgis Server发布数据

      技术QGIS如何连接Arcgis Server发布数据这篇文章主要介绍了QGIS如何连接Arcgis Server发布数据,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家

      攻略 2021年11月28日
    • vue的axios请求(axios用法示例)

      技术axios是不是vue里面的小编给大家分享一下axios是不是vue里面的,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!axios不是vu

      攻略 2021年12月22日
    • impunity,犬儒主义到底是什么意思

      技术impunity,犬儒主义到底是什么意思犬儒主义者百科名片“犬儒主义”一般认为是苏格拉底的弟子安提斯泰尼创立的,另一人物第欧根尼则因为住在木桶里的怪异行为而成为更有名的犬儒主义者。当时奉行这一主义的哲学家或思想家,他

      生活 2021年10月30日
    • 如何使用Java的MD5工具类和客户端测试类

      技术Java的MD5工具类和客户端测试类怎么使用这篇文章主要讲解了“Java的MD5工具类和客户端测试类怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java的M

      攻略 2021年12月16日