GNU Radio: Overview of the GNU Radio Scheduler

Scetion 1: The Flowgraph

The flowgraph moves data from sources into sinks.

一个流图由多个模块组成,其中一般包括信源(Source)模块和信宿(Sink)模块,然后通过连线将他们连接在一起并形成一个顶层模块(top_block类),最后通过调用顶层模块下的start()成员函数启动GNU Radio平台下的软件运行。在运行过程中,每一个模块都相当于是一个线程,通过GNU Radio平台下的调度机制将信源模块产生的数据经过一系列的处理模块,最后传输到信宿模块。这样一个运行过程包括了硬件驱动、数据buffer管理以及线程调度等工作。其中数据buffer的管理机制是零拷贝循环缓存(Section 4 会讲到),这样能够保证来自源的数据流高效在各个模块之间传输。

Example of data moving with rate changes.

GNU Radio 中的block根据输入输出样点的关系分为: 任意比例、1: 1,N: 1,1: N (Section 2会讲到),其中 sync 是1: 1的关系,即输出样点数等于输入样点数,decim是N: 1的关系,即输入: 输出 = N: 1。通过decim(抽取器)实现降速率传输。

The flowgraph must check the bounds to satify input/output requirements. 

All input streams and output streams must satify the constraints. 

Flowgraph 运行时会检查每个模块(block)是否满足输入输出的要求。每个模块的输出端口都绑定一个循环缓冲区(buffer),通过一个写指针 r_ptr 输出数据。它的下游模块通过一个读指针w_ptr 读取数据。每一个模块在满足输出缓冲区空间足够,并且输入缓冲区可读数据足够,该模块才能正常运行。对于上图 sync 模块来说,需要满足n_out >= 2048 && n_in >= 2048 该模块才能正常运行。 

The boundary conditions can change with rate changing blocks. 

Decimators need enough input to calculate the decimated output. 

The conditions are independently established with each block. 

This block is asking for less than it can on the input. 

Scetion 2: The general_work and work functions

The input and output buffers

general_work / work have two vectors passed to it. 

int
block::general_work(int output_items,
                    gr_vector_int &ninput_items,
                    gr_vector_const_void_star &input_items,
                    gr_vector_void_star &output_items)
int
block::work(int output_items,
            gr_vector_const_void_star &input_items,
            gr_vector_void_star &output_items)
  • intput_items is a vector of pointers to input buffers. 
  • output_items is a vector of pointers to output buffers. 

前面提到,每一个模块(block)都是一个线程,general_work() / work()是该线程的入口函数。通过两个 vector: intput_items 和 output_items 分别控制输入输出缓冲区的读写。对于上图的sync 模块来说,它有两个输入端口,一个输出端口,那么input_items[0],input_items[1]分别是两个输入端口的读指针,output_items[0]是输出端口的写指针。

general_work has not input/output relationship

It's told the number of output and input items:

int
block::general_work(int noutput_items,
                    gr_vector_int &ninput_items,
                    gr_vector_const_void_star &input_items,
                    gr_vector_void_star &output_items)
  • noutput_items: minimum number of output available on all output buffers.
  • ninput_items: vector of items available on all input buffers.

general_work函数对应的模块的输入输出关系是任意的,通过noutput_items 和 nintput_items来控制输入和输出之间的关系。其中我们注意到:noutput_items是一个变量,而nintout_items 是vector。这是因为GNU Radio要求模块的每个输出端口产生的样点数都必须相同,而对于输入则没有这个要求。对于上图的 sync 模块来说,nintput_items[0],nintput_items[1]分别表示两个端口输入的样点数,noutput_items表示所有输出端口的样点数。

Number of input and output items?

noutput_items: how many output items work can produce

  • general_work: no guaranteed relationship between inputs and outputs.
  • work: knowing noutput_items tells us ninput_items based on the established relationship
    • gr::sync_block: ninput_items[i] = noutput_items
    • gr::sync_decimator: ninput_items[i] = noutput_items*decimation()
    • gr::sync_interpolator: ninput_items[i] = noutput_items/interpolation()
  • Because of the input/output relationship of a sync block, only need to know one side

work函数对应的模块输入输出关系是确定的,分为3种:gr::sync_block,gr::sync_decimator,gr::sync_interpolator,输入输出的关系分别为1: 1,N: 1,1: N。并且在work()函数中没有nintput_items这个变量,因为可以根据上述关系,通过noutput_items计算出nintput_items。

work operates off just noutput items

From this number, we infer how many input items we have:

int
block::work(int noutput_items,
            gr_vector_const_void_star &input_items,
            gr_vector_void_star &output_items)
  • noutput_items: minimum number of output available on all output buffers.
  • ninput_items: calculated from noutput_items and type of sync block.

Scetion 3: Scheduler's job

Overview

The scheduler handles the buffer states, block requirements, messages, and stream tags. 

"A stream of samples is much more interesting when there is parsable metadata connected to that stream, such as the time of reception, center frequency, subframe index or even protocol-specific information. So there must be a way to identify PDU boundaries, pass control data between blocks. GNU Radio supports two ways to do this: Message passing and stream tag."

GNU Radio采用两种机制: Message passing and stream tag,在block 之间传输信息,例如接收时间,中心频率,子帧号或者特定的协议信息。其中Stream tag是同步标签,只能单向传输。Message是异步消息,可以向任何方向传输。在GNU Radio中,Stream tag中实线表示,Message用虚线表示。注意:Stream tag与数据流是并行传输模式,不是插入到原始数据流,也不会改变原始数据流,而是绑定到数据流的某一个样点,只能在block之间传递消息,不能通过天线发送出去!

Message Passing Layer
Send commands, metadata, and packets between blocks.
Asynchronous messages from and to any block:

tb.msg_connect(Blk1, "out port", Blk0, "in port")
tb.msg_connect(Blk2, "out port", Sink, "in port")

Scheduler Handles the Asynchronous Message Passing

Asynchronous Message Passing:

  • When a message is posted, it is placed in each subscribers queue.
  • Messages are handled before general_work is called.
  • The scheduler dispatches the messages:
    • Checks if there is a handler for the message type.
      • If there is no handler, a queue of max_nmsgs is held.
      • Oldest message is dropped if more than max_nmsgs in queue.
      • max_nmsgs is set in preferences le in [DEFAULT]:max_messages.
    • Pops the message off the queue.
    • Dispatches the message by calling the block's handler.

Stream tag layer

Adds a Control, Logic, and Metadata layer to data flow

Tags carry key/value data associated with a specic sample.

Tags are propagated downstream through each block.

Tags are updated by data rate changes.

 1 #ifndef INCLUDED_GR_TAGS_H
 2 #define INCLUDED_GR_TAGS_H
 3 
 4 #include <gnuradio/api.h>
 5 #include <pmt/pmt.h>
 6 
 7 namespace gr {
 8 
 9   struct GR_RUNTIME_API tag_t
10   {
11     //! the item p tag occurred at (as a uint64_t)
12     uint64_t offset;
13 
14     //! the key of p tag (as a PMT symbol)
15     pmt::pmt_t key;
16 
17     //! the value of p tag (as a PMT)
18     pmt::pmt_t value;
19 
20     //! the source ID of p tag (as a PMT)
21     pmt::pmt_t srcid;
22 
23     //! Used by gr_buffer to mark a tagged as deleted by a specific block. You can usually ignore this.
24     std::vector<long> marked_deleted;
25 
26     /*!
27      * Comparison function to test which tag, p x or p y, came
28      * first in time
29      */
30     static inline bool offset_compare(const tag_t &x,
31                                       const tag_t &y)
32     {
33       return x.offset < y.offset;
34     }
35 
36     inline bool operator == (const tag_t &t) const
37     {
38       return (t.key == key) && (t.value == value) && 
39       (t.srcid == srcid) && (t.offset == offset);
40     }
41 
42     tag_t()
43       : offset(0),
44         key(pmt::PMT_NIL),
45         value(pmt::PMT_NIL),
46         srcid(pmt::PMT_F)    // consistent with default srcid value in block::add_item_tag
47     {
48     }
49 
50     ~tag_t()
51     {
52     }
53   };
54 
55 } /* namespace gr */
56 
57 #endif /*INCLUDED_GR_TAGS_H*/
View Code tags.h

注意:在经过变速率的block之后,会改变tag的位置。

Propagate tags downstream based on the tag_propagation_policy. 

在block.h 中定义了3中tag传播策略:

enum tag_propagation_policy_t {
    TPP_DONT = 0,
    TPP_ALL_TO_ALL = 1,
    TPP_ONE_TO_ONE = 2
};

其中默认是TPP_ALL_TO_ALL,即输入端口收到的tag 会传输到每一个输出端口。TPP_ONE_TO_ONE模式是输入端口 i 收到的tag只能传出到输出端口 i 。TPP_DONE表示该block 收到tag 之后不再向后面的模块传输。

Tag propagation:

  • tag_propagation_policy typically set in block's constructor.Called after general_work.
    • Defaults to block::TPP_ALL_TO_ALL.
  • If propagating:
    • Gets tags in window of last work function.
    • If relative_ratio is 1, copies all tags as is.
    • Otherwise, adjusts offset of tag based on relative_ratio.

Review of propagation policies
block::TPP_ALL_TO_ALL

block::TPP_ONE_TO_ONE

block::TPP_DONT

  • Tags are not propagated and are removed from the stream.
  • Can allow block to handle propagation on its own.

Alignment

set_alignment(int multiple)

Set alignment in number of items.

  • Restricts number of items available to multiple of alignment.
  • Not guaranteed, but recovers quickly if unalignment unavoidable.

Output Multiple

set_output_multiple(int multiple)

  • Restricts number of items available to set multiple.
  • Similar to alignment, but this is guaranteed.
  • If not enough for alignment, will wait until there is.
  • Cannot be set dynamically.

Forecast

Overloaded function of the class

Tells scheduler how many input items are required for each output item.

  • Given noutput_items, calculates ninput_items[i] for each input stream.
    • Default: ninput_items[i]=noutput_items+history()-1;
    • Decim: ninput_items[i]=noutput_items*decimation()+history()-1;
    • Interp: ninput_items[i]=noutput_items/interpolation()+history()-1;
  • Use this to reduce the book-keeping checks in a block.
    • Can guaranteed ninput_items[i] > noutput_items
    • Don't have to check both conditions.

History

sethistory(nitems+1)

History sets read pointer history() items back in time.

  • Makes sure we have valid data history() items beyond noutput_items.
  • Used to allow causal signals between calls to work.

Buffer Size and Controlling Flow and Latency 

Set of features that affect the buffers 

  • set_max_noutput_items(int)
    • Caps the maximum noutput_items.
    • Will round down to nearest output multiple, if set.
    • Does not change the size of any buffers.
  • set_max_output_buffer(long)
    • Sets the maximum buffer size for all output buffers.
    • Buffer calculations are based on a number of factors, this limits overall size.
    • On most systems, will round to nearest page size.
  • set_min_output_buffer(long)
    • Sets the minimum buffer size for all output buffers.
    • On most systems, will round to nearest page size.

Scheduler Manages the Data Stream Conditions

General tasks:

  1. Calculate how many items are available on the input.
  2. Calculate how much space is available on the output.
  3. Determine restrictions: alignment, output_multiple, forecast requirements, etc.
  4. Adjust as necessary or abort and try again.
  5. Call the general_work function and pass appropriate pointers and number of items.
  6. Take returned info from general_work to update the pointers in the gr::buffer and gr::buffer_reader objects.

Scetion 4: Scheduler Flow Chart

Scheduler Flow Chart: top_block.start()

Start in scheduler_tpb.cc

Initialize thread for each block:

Each block's thread runs the loop until done

Handles messages, state, and calls run_one_iteration: 

run_one_iteration in block_executor.cc

Start of the iteration:

run_one_iteration::try_again

If block has inputs (sinks/blocks), handle input/output reqs.:

run_one_iteration::try_again: Fixed Rate

Fixed rate blocks have special restrictions:  

run_one_iteration::try_again: Alignment

Works to keeps buers aligned if possible:

run_one_iteration::try_again: Failure

If something goes wrong, try again, fail, or block and wait: 

run_one_iteration::setup_call_to_work

Call work and do book-keeping:

run_one_iteration::were_done

When the owgraph can't continue, end it: 

"Get items_available for all inputs"

Gets dierence between write pointers and read pointers for all inputs:

"Calc space on output buffer"

Space available is the difference between write pointers to the first read pointer. noutput_items is the minimum for all output buffers:

"call forecast, sets ninput_items_required"

Given noutput_items, forecast calculates the required number of items available for each input.

void
sync_decimator::forecast(int noutput_items,
                         gr_vector_int &ninput_items_required)
{
    unsigned ninputs = ninput_items_required.size();
    for(unsigned i = 0; i < ninputs; i++)
        ninput_items_required[i] = 
            fixed_rate_noutput_to_ninput(noutput_items);
}

int
sync_decimator::fixed_rate_noutput_to_ninput(int noutput_items)
{
    return noutput_items * decimation() + history() - 1;
}

"Do all inputs have nitems req.?"

Tests that items_available[i] >= ninput_items_required[i] for all i.

  • If yes, run the setup_call_to_work section.
  • Otherwise, we're in a fail mode:
    • If we still have enough output space, goto try_again.
    • If the input is marked done, goto were_done.
    • If block requires more than is possible, goto were_done.
    • Otherwise, we're blocked so we exit and will start over on next iteration.

Scetion 5: Buffer Creation

Buffers are handled almost completely behind the scenes

Standard Creation

  • GNU Radio selects the best option for how to create buffers.
  • Allocated at the start of a page.
  • Length is a multiple of the page size.
  • Memory mapped second half for easy circular buffer.
  • Guard pages one either side.

User controls

  • Minimum buffer size.
  • Maximum buffer size.

Circular buers in memory

Shows guard pages and memory-mapped half

Buffer creation techniques

Controlled by the vmcircbuf classes

  • Selects from:Reads from a preference file, if set.
    • vmcircbuf_createfilemapping.h
    • vmcircbuf_sysv_shm.h
    • vmcircbuf_mmap_shm_open.h
    • vmcircbuf_mmap_tmpfile.h
  • Tests all factories, saves preferred to preference file.

Buffer creation: Create File Mapping

Generally used for MS Windows

  • size required to be a multiple of the page size.
    • Uses CreateFileMapping to get a handle to paging file.
  • Allocates virtual memory of 2*size.
    • Uses VirtualAlloc to get first_tmp.
  • Map the paging file to the first half of the virtual memory.
    • Uses MapViewOfFileEx with first_tmp as pointer base.
  • Map the paging file to the second half of the virtual memory.Both first and second half are mapped to the same paging file. 
    • Uses MapViewOfFileEx with first_tmp+size as pointer base.

Buffer creation: Memory-mapped Temp File

Generally used for OSX

  • size required to be a multiple of the page size.Uses unlink to hide file and remove it when program closes.
    • Creates a temp file with permissions 0x0600.
  • Sets length of temp file to 2*size.
    • Uses ftruncate.
  • Map the first half of the file to a pointer first_copy.
    • Uses mmap to point to start of temp file.
  • Map the second half of the file to a pointer second_copy.Resets temp file to size with ftruncate.
    • Uses mmap to point to first_copy+size.
  • Uses first_copy as the Buffer's base address.

Buffer creation: System V Shared Memory

Generally used for Linux/POSIX

  • size required to be a multiple of the page size.Attach shmid1 to first half of schmid2 with shmat.
    • Uses shmget to get 2*size (plus guard pages) as schmid2.
    • Uses shmget to get size as shmid1.
  • Attach shmid1 to second half of schmid2 with shmat.
  • Memory in both halves of shmid2 are mapped to the same
  • virtual space.
  • Keep guard pages as read-only.
  • Return memory in shcmid2+pagesize as Buffer base location.
  • Keeps 2*size allocated.

Buffer creation: Memory-mapped Shared Memory

Alternative implementation for Linux/POSIX

  • size required to be a multiple of the page size.
    • Creates a shared memory segment with shm_open.
  • Sets length of memory segment to 2*size.
    • Uses ftruncate.
  • Map the rst half of the file to a pointer first_copy.
    • Uses mmap to point to start of memory segment.
  • Map the second half of the file to a pointer second_copy.
    • Uses mmap to point to first_copy+size.
  • We should reset memory segment to size with ftruncate.Uses first_copy as the buffer's base address.
    • on OSX this isn't allowed, though; not actually compiled.

VM circular buffer preference setting

Working VM Circular Buffer technique is stored in a prefs file

  • Handled by vmcircbuf_prefs class.
  • Path: $HOME/.gnuradio/prefs/vmcircbuf_default_factory
  • Single line that species the default factory function:If no file, we find the best version and store it here.
    • e.g., gr::vmcircbuf_sysv_shm_factory
  • Should only be created once on a machine when GNU Radio is first run.

Building a gr::buffer

Buffers are built and attached at runtime

  • When start is called, flowgraph is flattened and connections created.
  • gr::block_details are created and a gr::buffer for each output.Connects inputs by attaching a gr::buffer_reader.
    • Buffer size is calculated as the number of items to hold.
      • min/max restrictions applied, if set.

Calculating gr::buffer size

gr::flat_flowgraph::allocate_buffer

  • Takes in item_size.
  • Calculates number of items: nitems = s_fixed_buffer_size*2/item_size.Checks that nitems is at least 2x output_multiple.
    • s_fixed_buffer_size = 32768 bytes.
    • doubling the size to allow double buffering.
  • Checks max_output_buffer & min_output_buffer settings.
    • Both default to -1, which means no limit.
  • Checks that nitems is greater than decimation*output_multiple+history.
    • Must have enough to read in all of this at one time.

Calculating gr::buffer size: granularity

gr::buffer::allocate_buffer handles the actual creation

  • Checks if we have the minimum number of items:
    • Based on system granularity (i.e., page size) and item size.
    • Rounds up to a multiple of this minimum number of items.
  • Rounding up based on item size may result in unusual buffer sizes.Calls VM circular buffer default factory function.
    • We handle this; just sends a warning to the user.
  • Sets d_base, the address in memory for the buffer, from the circular buffer.

Controlling the size of buffers: min/max

User interface allows us to set min/max buffer for all blocks

  • Methods of gr::block:
    • gr::block::set_min_output_buffer(port, length)
    • gr::block::set_max_output_buffer(port, length)
  • Methods to set all ports the same:Will still round up to the nearest granularity of a buffer.
    • gr::block::set_min_output_buffer(length)
    • gr::block::set_max_output_buffer(length)
  • Can only be set before runtime to have an effect.

Scetion 6: Wrap-up

Review:

This presentation covered:

  • The responsibility of the scheduler.
  • And understanding of the user interaction with the scheduler.
  • The roles the scheduler plays in the three data streams:
    • Overview of the data stream, message passing, and tag streams.
    • Alignment, output multiple, forecast, and history.
  • Flow chart of the threaded loops each block runs.In-depth look into how the scheduler makes its calculations.
    • Launching the thread body.
    • Handling messages.
    • calling run_one_iteration and its tasks.
  • Buffer structure, calculations.

Purpose:

From the information in this presentation, you should be able to:

  • Better interact with the three data stream models
  • Use the features of the data flow model (forecast, history, etc.) to improve logic, performance
  • Understand how the buffer system works and how to extend or alter it for dierent architectures

参考链接: Overview of the GNU Radio Scheduler

参考链接: Explaining the GNU Radio Scheduler

 

原文地址:https://www.cnblogs.com/moon1992/p/5959806.html