source: XIOS/trunk/src/node/context.cpp @ 1199

Last change on this file since 1199 was 1191, checked in by rlacroix, 7 years ago

Do not use the "min_buffer_size" parameter also as maximum event size.

Using the "min_buffer_size" as maximum event size caused the maximum number of bufferable events to be completely underestimated which could affect performance.

The new solution is not perfect either as it overestimates the maximum number of bufferable events in most cases. In theory this could cause deadlocks but in practice it should be mostly safe.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 42.9 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16#include "timer.hpp"
17#include "memtrack.hpp"
18
19
20namespace xios {
21
22  shared_ptr<CContextGroup> CContext::root;
23
24   /// ////////////////////// Définitions ////////////////////// ///
25
26   CContext::CContext(void)
27      : CObjectTemplate<CContext>(), CContextAttributes()
28      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
29      , idServer_(), client(0), server(0)
30   { /* Ne rien faire de plus */ }
31
32   CContext::CContext(const StdString & id)
33      : CObjectTemplate<CContext>(id), CContextAttributes()
34      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
35      , idServer_(), client(0), server(0)
36   { /* Ne rien faire de plus */ }
37
38   CContext::~CContext(void)
39   {
40     delete client;
41     delete server;
42   }
43
44   //----------------------------------------------------------------
45   //! Get name of context
46   StdString CContext::GetName(void)   { return (StdString("context")); }
47   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
48   ENodeType CContext::GetType(void)   { return (eContext); }
49
50   //----------------------------------------------------------------
51
52   /*!
53   \brief Get context group (context root)
54   \return Context root
55   */
56   CContextGroup* CContext::getRoot(void)
57   {
58      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
59      return root.get();
60   }
61
62   //----------------------------------------------------------------
63
64   /*!
65   \brief Get calendar of a context
66   \return Calendar
67   */
68   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
69   {
70      return (this->calendar);
71   }
72
73   //----------------------------------------------------------------
74
75   /*!
76   \brief Set a context with a calendar
77   \param[in] newCalendar new calendar
78   */
79   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
80   {
81      this->calendar = newCalendar;
82   }
83
84   //----------------------------------------------------------------
85   /*!
86   \brief Parse xml file and write information into context object
87   \param [in] node xmld node corresponding in xml file
88   */
89   void CContext::parse(xml::CXMLNode & node)
90   {
91      CContext::SuperClass::parse(node);
92
93      // PARSING POUR GESTION DES ENFANTS
94      xml::THashAttributes attributes = node.getAttributes();
95
96      if (attributes.end() != attributes.find("src"))
97      {
98         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
99         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
100            ERROR("void CContext::parse(xml::CXMLNode & node)",
101                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
102         if (!ifs.good())
103            ERROR("CContext::parse(xml::CXMLNode & node)",
104                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
105         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
106      }
107
108      if (node.getElementName().compare(CContext::GetName()))
109         DEBUG("Le noeud is wrong defined but will be considered as a context !");
110
111      if (!(node.goToChildElement()))
112      {
113         DEBUG("Le context ne contient pas d'enfant !");
114      }
115      else
116      {
117         do { // Parcours des contextes pour traitement.
118
119            StdString name = node.getElementName();
120            attributes.clear();
121            attributes = node.getAttributes();
122
123            if (attributes.end() != attributes.find("id"))
124            {
125              DEBUG(<< "Definition node has an id,"
126                    << "it will not be taking account !");
127            }
128
129#define DECLARE_NODE(Name_, name_)    \
130   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
131   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
132#define DECLARE_NODE_PAR(Name_, name_)
133#include "node_type.conf"
134
135            DEBUG(<< "The element \'"     << name
136                  << "\' in the context \'" << CContext::getCurrent()->getId()
137                  << "\' is not a definition !");
138
139         } while (node.goToNextElement());
140
141         node.goToParentElement(); // Retour au parent
142      }
143   }
144
145   //----------------------------------------------------------------
146   //! Show tree structure of context
147   void CContext::ShowTree(StdOStream & out)
148   {
149      StdString currentContextId = CContext::getCurrent() -> getId();
150      std::vector<CContext*> def_vector =
151         CContext::getRoot()->getChildList();
152      std::vector<CContext*>::iterator
153         it = def_vector.begin(), end = def_vector.end();
154
155      out << "<? xml version=\"1.0\" ?>" << std::endl;
156      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
157
158      for (; it != end; it++)
159      {
160         CContext* context = *it;
161         CContext::setCurrent(context->getId());
162         out << *context << std::endl;
163      }
164
165      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
166      CContext::setCurrent(currentContextId);
167   }
168
169
170   //----------------------------------------------------------------
171
172   //! Convert context object into string (to print)
173   StdString CContext::toString(void) const
174   {
175      StdOStringStream oss;
176      oss << "<" << CContext::GetName()
177          << " id=\"" << this->getId() << "\" "
178          << SuperClassAttribute::toString() << ">" << std::endl;
179      if (!this->hasChild())
180      {
181         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
182      }
183      else
184      {
185
186#define DECLARE_NODE(Name_, name_)    \
187   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
188   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
189#define DECLARE_NODE_PAR(Name_, name_)
190#include "node_type.conf"
191
192      }
193
194      oss << "</" << CContext::GetName() << " >";
195
196      return (oss.str());
197   }
198
199   //----------------------------------------------------------------
200
201   /*!
202   \brief Find all inheritace among objects in a context.
203   \param [in] apply (true) write attributes of parent into ones of child if they are empty
204                     (false) write attributes of parent into a new container of child
205   \param [in] parent unused
206   */
207   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
208   {
209#define DECLARE_NODE(Name_, name_)    \
210   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
211     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
212#define DECLARE_NODE_PAR(Name_, name_)
213#include "node_type.conf"
214   }
215
216   //----------------------------------------------------------------
217
218   //! Verify if all root definition in the context have child.
219   bool CContext::hasChild(void) const
220   {
221      return (
222#define DECLARE_NODE(Name_, name_)    \
223   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
224#define DECLARE_NODE_PAR(Name_, name_)
225#include "node_type.conf"
226      false);
227}
228
229   //----------------------------------------------------------------
230
231   void CContext::CleanTree(void)
232   {
233#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
234#define DECLARE_NODE_PAR(Name_, name_)
235#include "node_type.conf"
236   }
237   ///---------------------------------------------------------------
238
239   //! Initialize client side
240   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
241   {
242     hasClient=true;
243     client = new CContextClient(this,intraComm, interComm, cxtServer);
244     registryIn=new CRegistry(intraComm);
245     registryIn->setPath(getId()) ;
246     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
247     registryIn->bcastRegistry() ;
248
249     registryOut=new CRegistry(intraComm) ;
250     registryOut->setPath(getId()) ;
251
252     MPI_Comm intraCommServer, interCommServer;
253     if (cxtServer) // Attached mode
254     {
255       intraCommServer = intraComm;
256       interCommServer = interComm;
257     }
258     else
259     {
260       MPI_Comm_dup(intraComm, &intraCommServer);
261       comms.push_back(intraCommServer);
262       MPI_Comm_dup(interComm, &interCommServer);
263       comms.push_back(interCommServer);
264     }
265     server = new CContextServer(this,intraCommServer,interCommServer);
266   }
267
268   void CContext::setClientServerBuffer()
269   {
270     size_t minBufferSize = CXios::minBufferSize;
271#define DECLARE_NODE(Name_, name_)    \
272     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
273#define DECLARE_NODE_PAR(Name_, name_)
274#include "node_type.conf"
275#undef DECLARE_NODE
276#undef DECLARE_NODE_PAR
277
278     std::map<int, StdSize> maxEventSize;
279     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
280     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
281
282     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
283     for (it = dataBufferSize.begin(); it != ite; ++it)
284       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
285
286     ite = bufferSize.end();
287     for (it = bufferSize.begin(); it != ite; ++it)
288     {
289       it->second *= CXios::bufferSizeFactor;
290       if (it->second < minBufferSize) it->second = minBufferSize;
291     }
292
293     client->setBufferSize(bufferSize, maxEventSize);
294   }
295
296   //! Verify whether a context is initialized
297   bool CContext::isInitialized(void)
298   {
299     return hasClient;
300   }
301
302   //! Initialize server
303   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
304   {
305     hasServer=true;
306     server = new CContextServer(this,intraComm,interComm);
307
308     registryIn=new CRegistry(intraComm);
309     registryIn->setPath(getId()) ;
310     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
311     registryIn->bcastRegistry() ;
312     registryOut=new CRegistry(intraComm) ;
313     registryOut->setPath(getId()) ;
314
315     MPI_Comm intraCommClient, interCommClient;
316     if (cxtClient) // Attached mode
317     {
318       intraCommClient = intraComm;
319       interCommClient = interComm;
320     }
321     else
322     {
323       MPI_Comm_dup(intraComm, &intraCommClient);
324       comms.push_back(intraCommClient);
325       MPI_Comm_dup(interComm, &interCommClient);
326       comms.push_back(interCommClient);
327     }
328     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
329   }
330
331   //! Try to send the buffers and receive possible answers
332   bool CContext::checkBuffersAndListen(void)
333   {
334     client->checkBuffers();
335
336     bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
337     if (hasTmpBufferedEvent)
338       hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
339
340     // Don't process events if there is a temporarily buffered event
341     return server->eventLoop(!hasTmpBufferedEvent);
342   }
343
344   //! Terminate a context
345   void CContext::finalize(void)
346   {
347      if (!finalized)
348      {
349        finalized = true;
350        if (hasClient) sendRegistry() ;
351        client->finalize();
352        while (!server->hasFinished())
353        {
354          server->eventLoop();
355        }
356
357        if (hasServer)
358        {
359          closeAllFile();
360          registryOut->hierarchicalGatherRegistry() ;
361          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
362        }
363
364        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
365          MPI_Comm_free(&(*it));
366        comms.clear();
367      }
368   }
369
370   /*!
371   \brief Close all the context defintion and do processing data
372      After everything is well defined on client side, they will be processed and sent to server
373   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
374   all necessary information to server, from which each server can build its own database.
375   Because the role of server is to write out field data on a specific netcdf file,
376   the only information that it needs is the enabled files
377   and the active fields (fields will be written onto active files)
378   */
379   void CContext::closeDefinition(void)
380   {
381     CTimer::get("Context : close definition").resume() ;
382     // There is nothing client need to send to server
383     if (hasClient)
384     {
385       // After xml is parsed, there are some more works with post processing
386       postProcessing();
387     }
388     setClientServerBuffer();
389
390     if (hasClient && !hasServer)
391     {
392      // Send all attributes of current context to server
393      this->sendAllAttributesToServer();
394
395      // Send all attributes of current calendar
396      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
397
398      // We have enough information to send to server
399      // First of all, send all enabled files
400       sendEnabledFiles();
401
402      // Then, send all enabled fields
403       sendEnabledFields();
404
405      // At last, we have all info of domain and axis, then send them
406       sendRefDomainsAxis();
407
408      // After that, send all grid (if any)
409       sendRefGrid();
410    }
411
412    // We have a xml tree on the server side and now, it should be also processed
413    if (hasClient && !hasServer) sendPostProcessing();
414
415    // There are some processings that should be done after all of above. For example: check mask or index
416    if (hasClient)
417    {
418      this->buildFilterGraphOfEnabledFields();
419      buildFilterGraphOfFieldsWithReadAccess();
420      this->solveAllRefOfEnabledFields(true);
421    }
422
423    // Now tell server that it can process all messages from client
424    if (hasClient && !hasServer) this->sendCloseDefinition();
425
426    // Nettoyage de l'arborescence
427    if (hasClient && !hasServer) CleanTree(); // Only on client side??
428
429    if (hasClient)
430    {
431      sendCreateFileHeader();
432
433      startPrefetchingOfEnabledReadModeFiles();
434    }
435    CTimer::get("Context : close definition").suspend() ;
436   }
437
438   void CContext::findAllEnabledFields(void)
439   {
440     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
441     (void)this->enabledFiles[i]->getEnabledFields();
442   }
443
444   void CContext::findAllEnabledFieldsInReadModeFiles(void)
445   {
446     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
447     (void)this->enabledReadModeFiles[i]->getEnabledFields();
448   }
449
450   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
451   {
452      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
453        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
454   }
455
456   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
457   {
458     int size = this->enabledFiles.size();
459     for (int i = 0; i < size; ++i)
460     {
461       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
462     }
463
464     for (int i = 0; i < size; ++i)
465     {
466       this->enabledFiles[i]->generateNewTransformationGridDest();
467     }
468   }
469
470   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
471   {
472     int size = this->enabledFiles.size();
473     for (int i = 0; i < size; ++i)
474     {
475       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
476     }
477   }
478
479   void CContext::buildFilterGraphOfEnabledFields()
480   {
481     int size = this->enabledFiles.size();
482     for (int i = 0; i < size; ++i)
483     {
484       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
485     }
486   }
487
488   void CContext::startPrefetchingOfEnabledReadModeFiles()
489   {
490     int size = enabledReadModeFiles.size();
491     for (int i = 0; i < size; ++i)
492     {
493        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
494     }
495   }
496
497   void CContext::checkPrefetchingOfEnabledReadModeFiles()
498   {
499     int size = enabledReadModeFiles.size();
500     for (int i = 0; i < size; ++i)
501     {
502        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
503     }
504   }
505
506  void CContext::findFieldsWithReadAccess(void)
507  {
508    fieldsWithReadAccess.clear();
509    const vector<CField*> allFields = CField::getAll();
510    for (size_t i = 0; i < allFields.size(); ++i)
511    {
512      CField* field = allFields[i];
513
514      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
515        field->read_access = true;
516      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
517        fieldsWithReadAccess.push_back(field);
518    }
519  }
520
521  void CContext::solveAllRefOfFieldsWithReadAccess()
522  {
523    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
524      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
525  }
526
527  void CContext::buildFilterGraphOfFieldsWithReadAccess()
528  {
529    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
530      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
531  }
532
533   void CContext::solveAllInheritance(bool apply)
534   {
535     // Résolution des héritages descendants (càd des héritages de groupes)
536     // pour chacun des contextes.
537      solveDescInheritance(apply);
538
539     // Résolution des héritages par référence au niveau des fichiers.
540      const vector<CFile*> allFiles=CFile::getAll();
541      const vector<CGrid*> allGrids= CGrid::getAll();
542
543     //if (hasClient && !hasServer)
544      if (hasClient)
545      {
546        for (unsigned int i = 0; i < allFiles.size(); i++)
547          allFiles[i]->solveFieldRefInheritance(apply);
548      }
549
550      unsigned int vecSize = allGrids.size();
551      unsigned int i = 0;
552      for (i = 0; i < vecSize; ++i)
553        allGrids[i]->solveDomainAxisRefInheritance(apply);
554
555   }
556
557   void CContext::findEnabledFiles(void)
558   {
559      const std::vector<CFile*> allFiles = CFile::getAll();
560      const CDate& initDate = calendar->getInitDate();
561
562      for (unsigned int i = 0; i < allFiles.size(); i++)
563         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
564         {
565            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
566            {
567              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
568              {
569                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
570                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
571                    <<"\" is less than the time step. File will not be written."<<endl;
572              }
573              else
574               enabledFiles.push_back(allFiles[i]);
575            }
576         }
577         else
578         {
579           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
580           {
581             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
582                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
583                 <<"\" is less than the time step. File will not be written."<<endl;
584           }
585           else
586             enabledFiles.push_back(allFiles[i]); // otherwise true by default
587         }
588
589      if (enabledFiles.size() == 0)
590         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
591               << getId() << "\" !");
592   }
593
594   void CContext::findEnabledReadModeFiles(void)
595   {
596     int size = this->enabledFiles.size();
597     for (int i = 0; i < size; ++i)
598     {
599       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
600        enabledReadModeFiles.push_back(enabledFiles[i]);
601     }
602   }
603
604   void CContext::closeAllFile(void)
605   {
606     std::vector<CFile*>::const_iterator
607            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
608
609     for (; it != end; it++)
610     {
611       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
612       (*it)->close();
613     }
614   }
615
616   /*!
617   \brief Dispatch event received from client
618      Whenever a message is received in buffer of server, it will be processed depending on
619   its event type. A new event type should be added in the switch list to make sure
620   it processed on server side.
621   \param [in] event: Received message
622   */
623   bool CContext::dispatchEvent(CEventServer& event)
624   {
625
626      if (SuperClass::dispatchEvent(event)) return true;
627      else
628      {
629        switch(event.type)
630        {
631           case EVENT_ID_CLOSE_DEFINITION :
632             recvCloseDefinition(event);
633             return true;
634             break;
635           case EVENT_ID_UPDATE_CALENDAR:
636             recvUpdateCalendar(event);
637             return true;
638             break;
639           case EVENT_ID_CREATE_FILE_HEADER :
640             recvCreateFileHeader(event);
641             return true;
642             break;
643           case EVENT_ID_POST_PROCESS:
644             recvPostProcessing(event);
645             return true;
646            case EVENT_ID_SEND_REGISTRY:
647             recvRegistry(event);
648             return true;
649            break;
650
651           default :
652             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
653                    <<"Unknown Event");
654           return false;
655         }
656      }
657   }
658
659   //! Client side: Send a message to server to make it close
660   void CContext::sendCloseDefinition(void)
661   {
662     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
663     if (client->isServerLeader())
664     {
665       CMessage msg;
666       msg<<this->getIdServer();
667       const std::list<int>& ranks = client->getRanksServerLeader();
668       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
669         event.push(*itRank,1,msg);
670       client->sendEvent(event);
671     }
672     else client->sendEvent(event);
673   }
674
675   //! Server side: Receive a message of client announcing a context close
676   void CContext::recvCloseDefinition(CEventServer& event)
677   {
678
679      CBufferIn* buffer=event.subEvents.begin()->buffer;
680      string id;
681      *buffer>>id;
682      get(id)->closeDefinition();
683   }
684
685   //! Client side: Send a message to update calendar in each time step
686   void CContext::sendUpdateCalendar(int step)
687   {
688     if (!hasServer)
689     {
690       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
691       if (client->isServerLeader())
692       {
693         CMessage msg;
694         msg<<this->getIdServer()<<step;
695         const std::list<int>& ranks = client->getRanksServerLeader();
696         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
697           event.push(*itRank,1,msg);
698         client->sendEvent(event);
699       }
700       else client->sendEvent(event);
701     }
702   }
703
704   //! Server side: Receive a message of client annoucing calendar update
705   void CContext::recvUpdateCalendar(CEventServer& event)
706   {
707      CBufferIn* buffer=event.subEvents.begin()->buffer;
708      string id;
709      *buffer>>id;
710      get(id)->recvUpdateCalendar(*buffer);
711   }
712
713   //! Server side: Receive a message of client annoucing calendar update
714   void CContext::recvUpdateCalendar(CBufferIn& buffer)
715   {
716      int step;
717      buffer>>step;
718      updateCalendar(step);
719   }
720
721   //! Client side: Send a message to create header part of netcdf file
722   void CContext::sendCreateFileHeader(void)
723   {
724     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
725     if (client->isServerLeader())
726     {
727       CMessage msg;
728       msg<<this->getIdServer();
729       const std::list<int>& ranks = client->getRanksServerLeader();
730       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
731         event.push(*itRank,1,msg) ;
732       client->sendEvent(event);
733     }
734     else client->sendEvent(event);
735   }
736
737   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
738   void CContext::recvCreateFileHeader(CEventServer& event)
739   {
740      CBufferIn* buffer=event.subEvents.begin()->buffer;
741      string id;
742      *buffer>>id;
743      get(id)->recvCreateFileHeader(*buffer);
744   }
745
746   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
747   void CContext::recvCreateFileHeader(CBufferIn& buffer)
748   {
749      createFileHeader();
750   }
751
752   //! Client side: Send a message to do some post processing on server
753   void CContext::sendPostProcessing()
754   {
755     if (!hasServer)
756     {
757       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
758       if (client->isServerLeader())
759       {
760         CMessage msg;
761         msg<<this->getIdServer();
762         const std::list<int>& ranks = client->getRanksServerLeader();
763         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
764           event.push(*itRank,1,msg);
765         client->sendEvent(event);
766       }
767       else client->sendEvent(event);
768     }
769   }
770
771   //! Server side: Receive a message to do some post processing
772   void CContext::recvPostProcessing(CEventServer& event)
773   {
774      CBufferIn* buffer=event.subEvents.begin()->buffer;
775      string id;
776      *buffer>>id;
777      get(id)->recvPostProcessing(*buffer);
778   }
779
780   //! Server side: Receive a message to do some post processing
781   void CContext::recvPostProcessing(CBufferIn& buffer)
782   {
783      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
784      postProcessing();
785   }
786
787   const StdString& CContext::getIdServer()
788   {
789      if (hasClient)
790      {
791        idServer_ = this->getId();
792        idServer_ += "_server";
793        return idServer_;
794      }
795      if (hasServer) return (this->getId());
796   }
797
798   /*!
799   \brief Do some simple post processings after parsing xml file
800      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
801   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
802   which will be written out into netcdf files, are processed
803   */
804   void CContext::postProcessing()
805   {
806     if (isPostProcessed) return;
807
808      // Make sure the calendar was correctly created
809      if (!calendar)
810        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
811      else if (calendar->getTimeStep() == NoneDu)
812        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
813      // Calendar first update to set the current date equals to the start date
814      calendar->update(0);
815
816      // Find all inheritance in xml structure
817      this->solveAllInheritance();
818
819      // Check if some axis, domains or grids are eligible to for compressed indexed output.
820      // Warning: This must be done after solving the inheritance and before the rest of post-processing
821      checkAxisDomainsGridsEligibilityForCompressedOutput();
822
823      // Check if some automatic time series should be generated
824      // Warning: This must be done after solving the inheritance and before the rest of post-processing
825      prepareTimeseries();
826
827      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
828      this->findEnabledFiles();
829      this->findEnabledReadModeFiles();
830
831      // Find all enabled fields of each file
832      this->findAllEnabledFields();
833      this->findAllEnabledFieldsInReadModeFiles();
834
835     if (hasClient && !hasServer)
836     {
837      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
838      this->readAttributesOfEnabledFieldsInReadModeFiles();
839     }
840
841      // Only search and rebuild all reference objects of enable fields, don't transform
842      this->solveOnlyRefOfEnabledFields(false);
843
844      // Search and rebuild all reference object of enabled fields
845      this->solveAllRefOfEnabledFields(false);
846
847      // Find all fields with read access from the public API
848      findFieldsWithReadAccess();
849      // and solve the all reference for them
850      solveAllRefOfFieldsWithReadAccess();
851
852      isPostProcessed = true;
853   }
854
855   /*!
856    * Compute the required buffer size to send the attributes (mostly those grid related).
857    *
858    * \param maxEventSize [in/out] the size of the bigger event for each connected server
859    */
860   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
861   {
862     std::map<int, StdSize> attributesSize;
863
864     if (hasClient)
865     {
866       size_t numEnabledFiles = this->enabledFiles.size();
867       for (size_t i = 0; i < numEnabledFiles; ++i)
868       {
869         CFile* file = this->enabledFiles[i];
870
871         std::vector<CField*> enabledFields = file->getEnabledFields();
872         size_t numEnabledFields = enabledFields.size();
873         for (size_t j = 0; j < numEnabledFields; ++j)
874         {
875           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
876           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
877           for (; it != itE; ++it)
878           {
879             // If attributesSize[it->first] does not exist, it will be zero-initialized
880             // so we can use it safely without checking for its existance
881             if (attributesSize[it->first] < it->second)
882               attributesSize[it->first] = it->second;
883
884             if (maxEventSize[it->first] < it->second)
885               maxEventSize[it->first] = it->second;
886           }
887         }
888       }
889     }
890
891     return attributesSize;
892   }
893
894   /*!
895    * Compute the required buffer size to send the fields data.
896    *
897    * \param maxEventSize [in/out] the size of the bigger event for each connected server
898    */
899   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
900   {
901     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
902
903     std::map<int, StdSize> dataSize;
904
905     // Find all reference domain and axis of all active fields
906     size_t numEnabledFiles = this->enabledFiles.size();
907     for (size_t i = 0; i < numEnabledFiles; ++i)
908     {
909       CFile* file = this->enabledFiles[i];
910       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
911
912       if (fileMode == mode)
913       {
914         std::vector<CField*> enabledFields = file->getEnabledFields();
915         size_t numEnabledFields = enabledFields.size();
916         for (size_t j = 0; j < numEnabledFields; ++j)
917         {
918           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
919           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
920           for (; it != itE; ++it)
921           {
922             // If dataSize[it->first] does not exist, it will be zero-initialized
923             // so we can use it safely without checking for its existance
924             if (CXios::isOptPerformance)
925               dataSize[it->first] += it->second;
926             else if (dataSize[it->first] < it->second)
927               dataSize[it->first] = it->second;
928
929             if (maxEventSize[it->first] < it->second)
930               maxEventSize[it->first] = it->second;
931           }
932         }
933       }
934     }
935
936     return dataSize;
937   }
938
939   //! Client side: Send infomation of active files (files are enabled to write out)
940   void CContext::sendEnabledFiles()
941   {
942     int size = this->enabledFiles.size();
943
944     // In a context, each type has a root definition, e.g: axis, domain, field.
945     // Every object must be a child of one of these root definition. In this case
946     // all new file objects created on server must be children of the root "file_definition"
947     StdString fileDefRoot("file_definition");
948     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
949
950     for (int i = 0; i < size; ++i)
951     {
952       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
953       this->enabledFiles[i]->sendAllAttributesToServer();
954       this->enabledFiles[i]->sendAddAllVariables();
955     }
956   }
957
958   //! Client side: Send information of active fields (ones are written onto files)
959   void CContext::sendEnabledFields()
960   {
961     int size = this->enabledFiles.size();
962     for (int i = 0; i < size; ++i)
963     {
964       this->enabledFiles[i]->sendEnabledFields();
965     }
966   }
967
968   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
969   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
970   {
971     if (!hasClient) return;
972
973     const vector<CAxis*> allAxis = CAxis::getAll();
974     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
975       (*it)->checkEligibilityForCompressedOutput();
976
977     const vector<CDomain*> allDomains = CDomain::getAll();
978     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
979       (*it)->checkEligibilityForCompressedOutput();
980
981     const vector<CGrid*> allGrids = CGrid::getAll();
982     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
983       (*it)->checkEligibilityForCompressedOutput();
984   }
985
986   //! Client side: Prepare the timeseries by adding the necessary files
987   void CContext::prepareTimeseries()
988   {
989     if (!hasClient) return;
990
991     const std::vector<CFile*> allFiles = CFile::getAll();
992     for (size_t i = 0; i < allFiles.size(); i++)
993     {
994       CFile* file = allFiles[i];
995
996       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
997       for (size_t k = 0; k < vars.size(); k++)
998       {
999         CVariable* var = vars[k];
1000
1001         if (var->ts_target.isEmpty()
1002              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1003           fileVars.push_back(var);
1004
1005         if (!var->ts_target.isEmpty()
1006              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1007           fieldVars.push_back(var);
1008       }
1009
1010       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1011       {
1012         StdString fileNameStr("%file_name%") ;
1013         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1014         
1015         StdString fileName=file->getFileOutputName();
1016         size_t pos=tsPrefix.find(fileNameStr) ;
1017         while (pos!=std::string::npos)
1018         {
1019           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1020           pos=tsPrefix.find(fileNameStr) ;
1021         }
1022       
1023         const std::vector<CField*> allFields = file->getAllFields();
1024         for (size_t j = 0; j < allFields.size(); j++)
1025         {
1026           CField* field = allFields[j];
1027
1028           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1029           {
1030             CFile* tsFile = CFile::create();
1031             tsFile->duplicateAttributes(file);
1032
1033             // Add variables originating from file and targeted to timeserie file
1034             for (size_t k = 0; k < fileVars.size(); k++)
1035               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1036
1037           
1038             tsFile->name = tsPrefix + "_";
1039             if (!field->name.isEmpty())
1040               tsFile->name.get() += field->name;
1041             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1042               tsFile->name.get() += field->field_ref;
1043             else
1044               tsFile->name.get() += field->getId();
1045
1046             if (!field->ts_split_freq.isEmpty())
1047               tsFile->split_freq = field->ts_split_freq;
1048
1049             CField* tsField = tsFile->addField();
1050             tsField->field_ref = field->getId();
1051
1052             // Add variables originating from file and targeted to timeserie field
1053             for (size_t k = 0; k < fieldVars.size(); k++)
1054               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1055
1056             vars = field->getAllVariables();
1057             for (size_t k = 0; k < vars.size(); k++)
1058             {
1059               CVariable* var = vars[k];
1060
1061               // Add variables originating from field and targeted to timeserie field
1062               if (var->ts_target.isEmpty()
1063                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1064                 tsField->getVirtualVariableGroup()->addChild(var);
1065
1066               // Add variables originating from field and targeted to timeserie file
1067               if (!var->ts_target.isEmpty()
1068                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1069                 tsFile->getVirtualVariableGroup()->addChild(var);
1070             }
1071
1072             tsFile->solveFieldRefInheritance(true);
1073
1074             if (file->timeseries == CFile::timeseries_attr::exclusive)
1075               field->enabled = false;
1076           }
1077         }
1078
1079         // Finally disable the original file is need be
1080         if (file->timeseries == CFile::timeseries_attr::only)
1081          file->enabled = false;
1082       }
1083     }
1084   }
1085
1086   //! Client side: Send information of reference grid of active fields
1087   void CContext::sendRefGrid()
1088   {
1089     std::set<StdString> gridIds;
1090     int sizeFile = this->enabledFiles.size();
1091     CFile* filePtr(NULL);
1092
1093     // Firstly, find all reference grids of all active fields
1094     for (int i = 0; i < sizeFile; ++i)
1095     {
1096       filePtr = this->enabledFiles[i];
1097       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1098       int sizeField = enabledFields.size();
1099       for (int numField = 0; numField < sizeField; ++numField)
1100       {
1101         if (0 != enabledFields[numField]->getRelGrid())
1102           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1103       }
1104     }
1105
1106     // Create all reference grids on server side
1107     StdString gridDefRoot("grid_definition");
1108     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1109     std::set<StdString>::const_iterator it, itE = gridIds.end();
1110     for (it = gridIds.begin(); it != itE; ++it)
1111     {
1112       gridPtr->sendCreateChild(*it);
1113       CGrid::get(*it)->sendAllAttributesToServer();
1114       CGrid::get(*it)->sendAllDomains();
1115       CGrid::get(*it)->sendAllAxis();
1116       CGrid::get(*it)->sendAllScalars();
1117     }
1118   }
1119
1120
1121   //! Client side: Send information of reference domain and axis of active fields
1122   void CContext::sendRefDomainsAxis()
1123   {
1124     std::set<StdString> domainIds, axisIds, scalarIds;
1125
1126     // Find all reference domain and axis of all active fields
1127     int numEnabledFiles = this->enabledFiles.size();
1128     for (int i = 0; i < numEnabledFiles; ++i)
1129     {
1130       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1131       int numEnabledFields = enabledFields.size();
1132       for (int j = 0; j < numEnabledFields; ++j)
1133       {
1134         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1135         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1136         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1137         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1138       }
1139     }
1140
1141     // Create all reference axis on server side
1142     std::set<StdString>::iterator itDom, itAxis, itScalar;
1143     std::set<StdString>::const_iterator itE;
1144
1145     StdString scalarDefRoot("scalar_definition");
1146     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1147     itE = scalarIds.end();
1148     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1149     {
1150       if (!itScalar->empty())
1151       {
1152         scalarPtr->sendCreateChild(*itScalar);
1153         CScalar::get(*itScalar)->sendAllAttributesToServer();
1154       }
1155     }
1156
1157     StdString axiDefRoot("axis_definition");
1158     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1159     itE = axisIds.end();
1160     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1161     {
1162       if (!itAxis->empty())
1163       {
1164         axisPtr->sendCreateChild(*itAxis);
1165         CAxis::get(*itAxis)->sendAllAttributesToServer();
1166       }
1167     }
1168
1169     // Create all reference domains on server side
1170     StdString domDefRoot("domain_definition");
1171     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1172     itE = domainIds.end();
1173     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1174     {
1175       if (!itDom->empty()) {
1176          domPtr->sendCreateChild(*itDom);
1177          CDomain::get(*itDom)->sendAllAttributesToServer();
1178       }
1179     }
1180   }
1181
1182   //! Update calendar in each time step
1183   void CContext::updateCalendar(int step)
1184   {
1185      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1186      calendar->update(step);
1187      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1188#ifdef XIOS_MEMTRACK_LIGHT
1189      info(50) << " Current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ;
1190#endif
1191      if (hasClient)
1192      {
1193        checkPrefetchingOfEnabledReadModeFiles();
1194        garbageCollector.invalidate(calendar->getCurrentDate());
1195      }
1196   }
1197
1198   //! Server side: Create header of netcdf file
1199   void CContext::createFileHeader(void )
1200   {
1201      vector<CFile*>::const_iterator it;
1202
1203      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1204      {
1205         (*it)->initFile();
1206      }
1207   }
1208
1209   //! Get current context
1210   CContext* CContext::getCurrent(void)
1211   {
1212     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1213   }
1214
1215   /*!
1216   \brief Set context with an id be the current context
1217   \param [in] id identity of context to be set to current
1218   */
1219   void CContext::setCurrent(const string& id)
1220   {
1221     CObjectFactory::SetCurrentContextId(id);
1222     CGroupFactory::SetCurrentContextId(id);
1223   }
1224
1225  /*!
1226  \brief Create a context with specific id
1227  \param [in] id identity of new context
1228  \return pointer to the new context or already-existed one with identity id
1229  */
1230  CContext* CContext::create(const StdString& id)
1231  {
1232    CContext::setCurrent(id);
1233
1234    bool hasctxt = CContext::has(id);
1235    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1236    getRoot();
1237    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1238
1239#define DECLARE_NODE(Name_, name_) \
1240    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1241#define DECLARE_NODE_PAR(Name_, name_)
1242#include "node_type.conf"
1243
1244    return (context);
1245  }
1246
1247
1248
1249     //! Server side: Receive a message to do some post processing
1250  void CContext::recvRegistry(CEventServer& event)
1251  {
1252    CBufferIn* buffer=event.subEvents.begin()->buffer;
1253    string id;
1254    *buffer>>id;
1255    get(id)->recvRegistry(*buffer);
1256  }
1257
1258  void CContext::recvRegistry(CBufferIn& buffer)
1259  {
1260    if (server->intraCommRank==0)
1261    {
1262      CRegistry registry(server->intraComm) ;
1263      registry.fromBuffer(buffer) ;
1264      registryOut->mergeRegistry(registry) ;
1265    }
1266  }
1267
1268  void CContext::sendRegistry(void)
1269  {
1270    registryOut->hierarchicalGatherRegistry() ;
1271
1272    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1273    if (client->isServerLeader())
1274    {
1275       CMessage msg ;
1276       msg<<this->getIdServer();
1277       if (client->clientRank==0) msg<<*registryOut ;
1278       const std::list<int>& ranks = client->getRanksServerLeader();
1279       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1280         event.push(*itRank,1,msg);
1281       client->sendEvent(event);
1282     }
1283     else client->sendEvent(event);
1284  }
1285
1286} // namespace xios
Note: See TracBrowser for help on using the repository browser.