[osg][osgearth]osg的分页加载,代码和结构图

DatabasePager加载数据的过程:

多线程 使用DatabasePager加载数据的流程:

  左侧的图框表示数据的检索和输入, 中间的白色框表示用于数据存储的内存空间,而右边的图框表示存储数据的输出。此外,蓝色图框表示可以在DatabaseThread线程中完成的工作, 而橙色图框表示由线程之外的函数完成的工作。

  1 void DatabasePager::DatabaseThread::run()
  2 {
  3     OSG_INFO<<_name<<": DatabasePager::DatabaseThread::run"<<std::endl;
  4 
  5 
  6     bool firstTime = true;
  7 
  8     osg::ref_ptr<DatabasePager::ReadQueue> read_queue;
  9     osg::ref_ptr<DatabasePager::ReadQueue> out_queue;
 10 
 11     switch(_mode)
 12     {
 13         case(HANDLE_ALL_REQUESTS):
 14             read_queue = _pager->_fileRequestQueue;
 15             break;
 16         case(HANDLE_NON_HTTP):
 17             read_queue = _pager->_fileRequestQueue;
 18             out_queue = _pager->_httpRequestQueue;
 19             break;
 20         case(HANDLE_ONLY_HTTP):
 21             read_queue = _pager->_httpRequestQueue;
 22             break;
 23     }
 24 
 25 
 26     do
 27     {
 28         _active = false;
 29 
 30         read_queue->block();
 31 
 32         if (_done)
 33         {
 34             break;
 35         }
 36 
 37         _active = true;
 38 
 39         OSG_INFO<<_name<<": _pager->size()= "<<read_queue->size()<<" to delete = "<<read_queue->_childrenToDeleteList.size()<<std::endl;
 40 
 41 
 42 
 43         //
 44         // delete any children if required.
 45         //
 46         if (_pager->_deleteRemovedSubgraphsInDatabaseThread/* && !(read_queue->_childrenToDeleteList.empty())*/)
 47         {
 48             ObjectList deleteList;
 49             {
 50                 // Don't hold lock during destruction of deleteList
 51                 OpenThreads::ScopedLock<OpenThreads::Mutex> lock(read_queue->_requestMutex);
 52                 if (!read_queue->_childrenToDeleteList.empty())
 53                 {
 54                     deleteList.swap(read_queue->_childrenToDeleteList);
 55                     read_queue->updateBlock();
 56                 }
 57             }
 58         }
 59 
 60         //
 61         // load any subgraphs that are required.
 62         //
 63         osg::ref_ptr<DatabaseRequest> databaseRequest;
 64         read_queue->takeFirst(databaseRequest);
 65 
 66         bool readFromFileCache = false;
 67 
 68         osg::ref_ptr<FileCache> fileCache = osgDB::Registry::instance()->getFileCache();
 69         osg::ref_ptr<FileLocationCallback> fileLocationCallback = osgDB::Registry::instance()->getFileLocationCallback();
 70         osg::ref_ptr<Options> dr_loadOptions;
 71         std::string fileName;
 72         int frameNumberLastRequest = 0;
 73         bool cacheNodes = false;
 74         if (databaseRequest.valid())
 75         {
 76             {
 77                 OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex);
 78                 dr_loadOptions = databaseRequest->_loadOptions.valid() ? databaseRequest->_loadOptions->cloneOptions() : new osgDB::Options;
 79                 dr_loadOptions->setTerrain(databaseRequest->_terrain);
 80                 dr_loadOptions->setParentGroup(databaseRequest->_group);
 81                 fileName = databaseRequest->_fileName;
 82                 frameNumberLastRequest = databaseRequest->_frameNumberLastRequest;
 83             }
 84 
 85 
 86             if (dr_loadOptions->getFileCache()) fileCache = dr_loadOptions->getFileCache();
 87             if (dr_loadOptions->getFileLocationCallback()) fileLocationCallback = dr_loadOptions->getFileLocationCallback();
 88 
 89             // disable the FileCache if the fileLocationCallback tells us that it isn't required for this request.
 90             if (fileLocationCallback.valid() && !fileLocationCallback->useFileCache()) fileCache = 0;
 91 
 92 
 93             cacheNodes = (dr_loadOptions->getObjectCacheHint() & osgDB::Options::CACHE_NODES)!=0;
 94             if (cacheNodes)
 95             {
 96                 //OSG_NOTICE<<"Checking main ObjectCache"<<std::endl;
 97                 // check the object cache to see if the file we want has already been loaded.
 98                 osg::ref_ptr<osg::Object> objectFromCache = osgDB::Registry::instance()->getRefFromObjectCache(fileName);
 99 
100                 // if no object with fileName in ObjectCache then try the filename appropriate for fileCache
101                 if (!objectFromCache && (fileCache.valid() && fileCache->isFileAppropriateForFileCache(fileName)))
102                 {
103                     if (fileCache->existsInCache(fileName))
104                     {
105                         objectFromCache = osgDB::Registry::instance()->getRefFromObjectCache(fileCache->createCacheFileName(fileName));
106                     }
107                 }
108 
109 
110                 osg::Node* modelFromCache = dynamic_cast<osg::Node*>(objectFromCache.get());
111                 if (modelFromCache)
112                 {
113                     //OSG_NOTICE<<"Found object in cache "<<fileName<<std::endl;
114 
115                     // assign the cached model to the request
116                     {
117                         OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex);
118                         databaseRequest->_loadedModel = modelFromCache;
119                     }
120 
121                     // move the request to the dataToMerge list so it can be merged during the update phase of the frame.
122                     {
123                         OpenThreads::ScopedLock<OpenThreads::Mutex> listLock( _pager->_dataToMergeList->_requestMutex);
124                         _pager->_dataToMergeList->addNoLock(databaseRequest.get());
125                         databaseRequest = 0;
126                     }
127 
128                     // skip the rest of the do/while loop as we have done all the processing we need to do.
129                     continue;
130                 }
131                 else
132                 {
133                     //OSG_NOTICE<<"Not Found object in cache "<<fileName<<std::endl;
134                 }
135 
136                 // need to disable any attempt to use the cache when loading as we're handle this ourselves to avoid threading conflicts
137                 {
138                     OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex);
139                     databaseRequest->_objectCache = new ObjectCache;
140                     dr_loadOptions->setObjectCache(databaseRequest->_objectCache.get());
141                 }
142             }
143 
144 
145             // check if databaseRequest is still relevant
146             if ((_pager->_frameNumber-frameNumberLastRequest)<=1)
147             {
148 
149                 // now check to see if this request is appropriate for this thread
150                 switch(_mode)
151                 {
152                     case(HANDLE_ALL_REQUESTS):
153                     {
154                         // do nothing as this thread can handle the load
155                         if (fileCache.valid() && fileCache->isFileAppropriateForFileCache(fileName))
156                         {
157                             if (fileCache->existsInCache(fileName))
158                             {
159                                 readFromFileCache = true;
160                             }
161                         }
162                         break;
163                     }
164                     case(HANDLE_NON_HTTP):
165                     {
166                         // check the cache first
167                         bool isHighLatencyFileRequest = false;
168 
169                         if (fileLocationCallback.valid())
170                         {
171                             isHighLatencyFileRequest = fileLocationCallback->fileLocation(fileName, dr_loadOptions.get()) == FileLocationCallback::REMOTE_FILE;
172                         }
173                         else  if (fileCache.valid() && fileCache->isFileAppropriateForFileCache(fileName))
174                         {
175                             isHighLatencyFileRequest = true;
176                         }
177 
178                         if (isHighLatencyFileRequest)
179                         {
180                             if (fileCache.valid() && fileCache->existsInCache(fileName))
181                             {
182                                 readFromFileCache = true;
183                             }
184                             else
185                             {
186                                 OSG_INFO<<_name<<": Passing http requests over "<<fileName<<std::endl;
187                                 out_queue->add(databaseRequest.get());
188                                 databaseRequest = 0;
189                             }
190                         }
191                         break;
192                     }
193                     case(HANDLE_ONLY_HTTP):
194                     {
195                         // accept all requests, as we'll assume only high latency requests will have got here.
196                         break;
197                     }
198                 }
199             }
200             else
201             {
202                 databaseRequest = 0;
203             }
204         }
205 
206 
207         if (databaseRequest.valid())
208         {
209 
210             // load the data, note safe to write to the databaseRequest since once
211             // it is created this thread is the only one to write to the _loadedModel pointer.
212             //OSG_NOTICE<<"In DatabasePager thread readNodeFile("<<databaseRequest->_fileName<<")"<<std::endl;
213             //osg::Timer_t before = osg::Timer::instance()->tick();
214 
215 
216             // assume that readNode is thread safe...
217             ReaderWriter::ReadResult rr = readFromFileCache ?
218                         fileCache->readNode(fileName, dr_loadOptions.get(), false) :
219                         Registry::instance()->readNode(fileName, dr_loadOptions.get(), false);
220 
221             osg::ref_ptr<osg::Node> loadedModel;
222             if (rr.validNode()) loadedModel = rr.getNode();
223             if (rr.error()) OSG_WARN<<"Error in reading file "<<fileName<<" : "<<rr.message() << std::endl;
224             if (rr.notEnoughMemory()) OSG_INFO<<"Not enought memory to load file "<<fileName << std::endl;
225 
226             if (loadedModel.valid() &&
227                 fileCache.valid() &&
228                 fileCache->isFileAppropriateForFileCache(fileName) &&
229                 !readFromFileCache)
230             {
231                 fileCache->writeNode(*(loadedModel), fileName, dr_loadOptions.get());
232             }
233 
234             {
235                 OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex);
236                 if ((_pager->_frameNumber-databaseRequest->_frameNumberLastRequest)>1)
237                 {
238                     OSG_INFO<<_name<<": Warning DatabaseRquest no longer required."<<std::endl;
239                     loadedModel = 0;
240                 }
241             }
242 
243             //OSG_NOTICE<<"     node read in "<<osg::Timer::instance()->delta_m(before,osg::Timer::instance()->tick())<<" ms"<<std::endl;
244 
245             if (loadedModel.valid())
246             {
247                 loadedModel->getBound();
248 
249                 bool loadedObjectsNeedToBeCompiled = false;
250                 osg::ref_ptr<osgUtil::IncrementalCompileOperation::CompileSet> compileSet = 0;
251                 if (!rr.loadedFromCache())
252                 {
253                     // find all the compileable rendering objects
254                     DatabasePager::FindCompileableGLObjectsVisitor stateToCompile(_pager, _pager->getMarkerObject());
255                     loadedModel->accept(stateToCompile);
256 
257                     loadedObjectsNeedToBeCompiled = _pager->_doPreCompile &&
258                                                     _pager->_incrementalCompileOperation.valid() &&
259                                                     _pager->_incrementalCompileOperation->requiresCompile(stateToCompile);
260 
261                     // move the databaseRequest from the front of the fileRequest to the end of
262                     // dataToCompile or dataToMerge lists.
263                     if (loadedObjectsNeedToBeCompiled)
264                     {
265                         // OSG_NOTICE<<"Using IncrementalCompileOperation"<<std::endl;
266 
267                         compileSet = new osgUtil::IncrementalCompileOperation::CompileSet(loadedModel.get());
268                         compileSet->buildCompileMap(_pager->_incrementalCompileOperation->getContextSet(), stateToCompile);
269                         compileSet->_compileCompletedCallback = new DatabasePagerCompileCompletedCallback(_pager, databaseRequest.get());
270                         _pager->_incrementalCompileOperation->add(compileSet.get(), false);
271                     }
272                 }
273                 else
274                 {
275                     OSG_NOTICE<<"Loaded from ObjectCache"<<std::endl;
276                 }
277 
278 
279                 {
280                     OpenThreads::ScopedLock<OpenThreads::Mutex> drLock(_pager->_dr_mutex);
281                     databaseRequest->_loadedModel = loadedModel;
282                     databaseRequest->_compileSet = compileSet;
283                 }
284                 // Dereference the databaseRequest while the queue is
285                 // locked. This prevents the request from being
286                 // deleted at an unpredictable time within
287                 // addLoadedDataToSceneGraph.
288                 if (loadedObjectsNeedToBeCompiled)
289                 {
290                     OpenThreads::ScopedLock<OpenThreads::Mutex> listLock(
291                         _pager->_dataToCompileList->_requestMutex);
292                     _pager->_dataToCompileList->addNoLock(databaseRequest.get());
293                     databaseRequest = 0;
294                 }
295                 else
296                 {
297                     OpenThreads::ScopedLock<OpenThreads::Mutex> listLock(
298                         _pager->_dataToMergeList->_requestMutex);
299                     _pager->_dataToMergeList->addNoLock(databaseRequest.get());
300                     databaseRequest = 0;
301                 }
302 
303             }
304 
305             // _pager->_dataToCompileList->pruneOldRequestsAndCheckIfEmpty();
306         }
307         else
308         {
309             OpenThreads::Thread::YieldCurrentThread();
310         }
311 
312 
313         // go to sleep till our the next time our thread gets scheduled.
314 
315         if (firstTime)
316         {
317             // do a yield to get round a peculiar thread hang when testCancel() is called
318             // in certain circumstances - of which there is no particular pattern.
319             YieldCurrentThread();
320             firstTime = false;
321         }
322 
323     } while (!testCancel() && !_done);
324 }
原文地址:https://www.cnblogs.com/lyggqm/p/6401084.html