source: XIOS/trunk/src/node/context.cpp @ 731

Last change on this file since 731 was 731, checked in by rlacroix, 9 years ago

Correctly estimate the needed buffer sizes.

The attributes were not considered which could lead to incorrect estimations.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 37.2 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> bufferSize = getAttributesBufferSize();
277     std::map<int, StdSize>::iterator it, ite = bufferSize.end();
278     for (it = bufferSize.begin(); it != ite; ++it)
279       if (it->second < minBufferSize) it->second = minBufferSize;
280
281     std::map<int, StdSize> dataBufferSize = getDataBufferSize();
282     ite = dataBufferSize.end();
283     for (it = dataBufferSize.begin(); it != ite; ++it)
284       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
285
286     if (client->isServerLeader())
287     {
288       const std::list<int>& ranks = client->getRanksServerLeader();
289       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
290         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = minBufferSize;
291     }
292
293     if (!bufferSize.empty())
294       client->setBufferSize(bufferSize);
295   }
296
297   //! Verify whether a context is initialized
298   bool CContext::isInitialized(void)
299   {
300     return hasClient;
301   }
302
303   //! Initialize server
304   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
305   {
306     hasServer=true;
307     server = new CContextServer(this,intraComm,interComm);
308
309     registryIn=new CRegistry(intraComm);
310     registryIn->setPath(getId()) ;
311     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
312     registryIn->bcastRegistry() ;
313     registryOut=new CRegistry(intraComm) ;
314     registryOut->setPath(getId()) ;
315 
316     MPI_Comm intraCommClient, interCommClient;
317     if (cxtClient) // Attached mode
318     {
319       intraCommClient = intraComm;
320       interCommClient = interComm;
321     }
322     else
323     {
324       MPI_Comm_dup(intraComm, &intraCommClient);
325       comms.push_back(intraCommClient);
326       MPI_Comm_dup(interComm, &interCommClient);
327       comms.push_back(interCommClient);
328     }
329     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
330   }
331
332   //! Server side: Put server into a loop in order to listen message from client
333   bool CContext::eventLoop(void)
334   {
335     return server->eventLoop();
336   }
337
338   //! Try to send the buffers and receive possible answers
339   bool CContext::checkBuffersAndListen(void)
340   {
341     client->checkBuffers();
342     return server->eventLoop();
343   }
344
345   //! Terminate a context
346   void CContext::finalize(void)
347   {
348      if (!finalized)
349      {
350        finalized = true;
351        if (hasClient) sendRegistry() ;
352        client->finalize();
353        while (!server->hasFinished())
354        {
355          server->eventLoop();
356        }
357
358        if (hasServer)
359        {
360          closeAllFile();
361          registryOut->hierarchicalGatherRegistry() ;
362          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
363        }
364       
365        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
366          MPI_Comm_free(&(*it));
367        comms.clear();
368      }
369   }
370
371   /*!
372   \brief Close all the context defintion and do processing data
373      After everything is well defined on client side, they will be processed and sent to server
374   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
375   all necessary information to server, from which each server can build its own database.
376   Because the role of server is to write out field data on a specific netcdf file,
377   the only information that it needs is the enabled files
378   and the active fields (fields will be written onto active files)
379   */
380   void CContext::closeDefinition(void)
381   {
382     // There is nothing client need to send to server
383     if (hasClient)
384     {
385       // After xml is parsed, there are some more works with post processing
386       postProcessing();
387     }
388     setClientServerBuffer();
389
390     if (hasClient && !hasServer)
391     {
392      // Send all attributes of current context to server
393      this->sendAllAttributesToServer();
394
395      // Send all attributes of current calendar
396      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
397
398      // We have enough information to send to server
399      // First of all, send all enabled files
400       sendEnabledFiles();
401
402      // Then, send all enabled fields
403       sendEnabledFields();
404
405      // At last, we have all info of domain and axis, then send them
406       sendRefDomainsAxis();
407
408      // After that, send all grid (if any)
409       sendRefGrid();
410    }
411
412    // We have a xml tree on the server side and now, it should be also processed
413    if (hasClient && !hasServer) sendPostProcessing();
414
415    // There are some processings that should be done after all of above. For example: check mask or index
416    if (hasClient)
417    {
418      this->buildFilterGraphOfEnabledFields();
419      buildFilterGraphOfFieldsWithReadAccess();
420      this->solveAllRefOfEnabledFields(true);
421    }
422
423    // Now tell server that it can process all messages from client
424    if (hasClient && !hasServer) this->sendCloseDefinition();
425
426    // Nettoyage de l'arborescence
427    if (hasClient && !hasServer) CleanTree(); // Only on client side??
428
429    if (hasClient)
430    {
431      sendCreateFileHeader();
432
433      startPrefetchingOfEnabledReadModeFiles();
434    }
435   }
436
437   void CContext::findAllEnabledFields(void)
438   {
439     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
440     (void)this->enabledFiles[i]->getEnabledFields();
441   }
442
443   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
444   {
445     int size = this->enabledFiles.size();
446     for (int i = 0; i < size; ++i)
447     {
448       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
449     }
450   }
451
452   void CContext::buildFilterGraphOfEnabledFields()
453   {
454     int size = this->enabledFiles.size();
455     for (int i = 0; i < size; ++i)
456     {
457       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
458     }
459   }
460
461   void CContext::startPrefetchingOfEnabledReadModeFiles()
462   {
463     int size = enabledReadModeFiles.size();
464     for (int i = 0; i < size; ++i)
465     {
466        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
467     }
468   }
469
470   void CContext::checkPrefetchingOfEnabledReadModeFiles()
471   {
472     int size = enabledReadModeFiles.size();
473     for (int i = 0; i < size; ++i)
474     {
475        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
476     }
477   }
478
479  void CContext::findFieldsWithReadAccess(void)
480  {
481    fieldsWithReadAccess.clear();
482    const vector<CField*> allFields = CField::getAll();
483    for (size_t i = 0; i < allFields.size(); ++i)
484    {
485      if (allFields[i]->file && !allFields[i]->file->mode.isEmpty() && allFields[i]->file->mode.getValue() == CFile::mode_attr::read)
486        allFields[i]->read_access = true;
487      if (!allFields[i]->read_access.isEmpty() && allFields[i]->read_access.getValue())
488        fieldsWithReadAccess.push_back(allFields[i]);
489    }
490  }
491
492  void CContext::solveAllRefOfFieldsWithReadAccess()
493  {
494    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
495      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
496  }
497
498  void CContext::buildFilterGraphOfFieldsWithReadAccess()
499  {
500    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
501      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
502  }
503
504   void CContext::solveAllInheritance(bool apply)
505   {
506     // Résolution des héritages descendants (càd des héritages de groupes)
507     // pour chacun des contextes.
508      solveDescInheritance(apply);
509
510     // Résolution des héritages par référence au niveau des fichiers.
511      const vector<CFile*> allFiles=CFile::getAll();
512      const vector<CGrid*> allGrids= CGrid::getAll();
513
514     //if (hasClient && !hasServer)
515      if (hasClient)
516      {
517        for (unsigned int i = 0; i < allFiles.size(); i++)
518          allFiles[i]->solveFieldRefInheritance(apply);
519      }
520
521      unsigned int vecSize = allGrids.size();
522      unsigned int i = 0;
523      for (i = 0; i < vecSize; ++i)
524        allGrids[i]->solveDomainAxisRefInheritance(apply);
525
526   }
527
528   void CContext::findEnabledFiles(void)
529   {
530      const std::vector<CFile*> allFiles = CFile::getAll();
531
532      for (unsigned int i = 0; i < allFiles.size(); i++)
533         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
534         {
535            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
536               enabledFiles.push_back(allFiles[i]);
537         }
538         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
539
540
541      if (enabledFiles.size() == 0)
542         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
543               << getId() << "\" !");
544   }
545
546   void CContext::findEnabledReadModeFiles(void)
547   {
548     int size = this->enabledFiles.size();
549     for (int i = 0; i < size; ++i)
550     {
551       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
552        enabledReadModeFiles.push_back(enabledFiles[i]);
553     }
554   }
555
556   void CContext::closeAllFile(void)
557   {
558     std::vector<CFile*>::const_iterator
559            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
560
561     for (; it != end; it++)
562     {
563       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
564       (*it)->close();
565     }
566   }
567
568   /*!
569   \brief Dispatch event received from client
570      Whenever a message is received in buffer of server, it will be processed depending on
571   its event type. A new event type should be added in the switch list to make sure
572   it processed on server side.
573   \param [in] event: Received message
574   */
575   bool CContext::dispatchEvent(CEventServer& event)
576   {
577
578      if (SuperClass::dispatchEvent(event)) return true;
579      else
580      {
581        switch(event.type)
582        {
583           case EVENT_ID_CLOSE_DEFINITION :
584             recvCloseDefinition(event);
585             return true;
586             break;
587           case EVENT_ID_UPDATE_CALENDAR:
588             recvUpdateCalendar(event);
589             return true;
590             break;
591           case EVENT_ID_CREATE_FILE_HEADER :
592             recvCreateFileHeader(event);
593             return true;
594             break;
595           case EVENT_ID_POST_PROCESS:
596             recvPostProcessing(event);
597             return true;
598            case EVENT_ID_SEND_REGISTRY:
599             recvRegistry(event);
600             return true;
601            break;
602
603           default :
604             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
605                    <<"Unknown Event");
606           return false;
607         }
608      }
609   }
610
611   //! Client side: Send a message to server to make it close
612   void CContext::sendCloseDefinition(void)
613   {
614     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
615     if (client->isServerLeader())
616     {
617       CMessage msg;
618       msg<<this->getIdServer();
619       const std::list<int>& ranks = client->getRanksServerLeader();
620       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
621         event.push(*itRank,1,msg);
622       client->sendEvent(event);
623     }
624     else client->sendEvent(event);
625   }
626
627   //! Server side: Receive a message of client announcing a context close
628   void CContext::recvCloseDefinition(CEventServer& event)
629   {
630
631      CBufferIn* buffer=event.subEvents.begin()->buffer;
632      string id;
633      *buffer>>id;
634      get(id)->closeDefinition();
635   }
636
637   //! Client side: Send a message to update calendar in each time step
638   void CContext::sendUpdateCalendar(int step)
639   {
640     if (!hasServer)
641     {
642       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
643       if (client->isServerLeader())
644       {
645         CMessage msg;
646         msg<<this->getIdServer()<<step;
647         const std::list<int>& ranks = client->getRanksServerLeader();
648         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
649           event.push(*itRank,1,msg);
650         client->sendEvent(event);
651       }
652       else client->sendEvent(event);
653     }
654   }
655
656   //! Server side: Receive a message of client annoucing calendar update
657   void CContext::recvUpdateCalendar(CEventServer& event)
658   {
659      CBufferIn* buffer=event.subEvents.begin()->buffer;
660      string id;
661      *buffer>>id;
662      get(id)->recvUpdateCalendar(*buffer);
663   }
664
665   //! Server side: Receive a message of client annoucing calendar update
666   void CContext::recvUpdateCalendar(CBufferIn& buffer)
667   {
668      int step;
669      buffer>>step;
670      updateCalendar(step);
671   }
672
673   //! Client side: Send a message to create header part of netcdf file
674   void CContext::sendCreateFileHeader(void)
675   {
676     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
677     if (client->isServerLeader())
678     {
679       CMessage msg;
680       msg<<this->getIdServer();
681       const std::list<int>& ranks = client->getRanksServerLeader();
682       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
683         event.push(*itRank,1,msg) ;
684       client->sendEvent(event);
685     }
686     else client->sendEvent(event);
687   }
688
689   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
690   void CContext::recvCreateFileHeader(CEventServer& event)
691   {
692      CBufferIn* buffer=event.subEvents.begin()->buffer;
693      string id;
694      *buffer>>id;
695      get(id)->recvCreateFileHeader(*buffer);
696   }
697
698   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
699   void CContext::recvCreateFileHeader(CBufferIn& buffer)
700   {
701      createFileHeader();
702   }
703
704   //! Client side: Send a message to do some post processing on server
705   void CContext::sendPostProcessing()
706   {
707     if (!hasServer)
708     {
709       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
710       if (client->isServerLeader())
711       {
712         CMessage msg;
713         msg<<this->getIdServer();
714         const std::list<int>& ranks = client->getRanksServerLeader();
715         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
716           event.push(*itRank,1,msg);
717         client->sendEvent(event);
718       }
719       else client->sendEvent(event);
720     }
721   }
722
723   //! Server side: Receive a message to do some post processing
724   void CContext::recvPostProcessing(CEventServer& event)
725   {
726      CBufferIn* buffer=event.subEvents.begin()->buffer;
727      string id;
728      *buffer>>id;
729      get(id)->recvPostProcessing(*buffer);
730   }
731
732   //! Server side: Receive a message to do some post processing
733   void CContext::recvPostProcessing(CBufferIn& buffer)
734   {
735      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
736      postProcessing();
737   }
738
739   const StdString& CContext::getIdServer()
740   {
741      if (hasClient)
742      {
743        idServer_ = this->getId();
744        idServer_ += "_server";
745        return idServer_;
746      }
747      if (hasServer) return (this->getId());
748   }
749
750   /*!
751   \brief Do some simple post processings after parsing xml file
752      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
753   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
754   which will be written out into netcdf files, are processed
755   */
756   void CContext::postProcessing()
757   {
758     if (isPostProcessed) return;
759
760      // Make sure the calendar was correctly created
761      if (!calendar)
762        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
763      else if (calendar->getTimeStep() == NoneDu)
764        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
765      // Calendar first update to set the current date equals to the start date
766      calendar->update(0);
767
768      // Find all inheritance in xml structure
769      this->solveAllInheritance();
770
771      // Check if some axis, domains or grids are eligible to for compressed indexed output.
772      // Warning: This must be done after solving the inheritance and before the rest of post-processing
773      checkAxisDomainsGridsEligibilityForCompressedOutput();
774
775      // Check if some automatic time series should be generated
776      // Warning: This must be done after solving the inheritance and before the rest of post-processing
777      prepareTimeseries();
778
779      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
780      this->findEnabledFiles();
781      this->findEnabledReadModeFiles();
782
783      // Find all enabled fields of each file
784      this->findAllEnabledFields();
785
786      // Search and rebuild all reference object of enabled fields
787      this->solveAllRefOfEnabledFields(false);
788
789      // Find all fields with read access from the public API
790      findFieldsWithReadAccess();
791      // and solve the all reference for them
792      solveAllRefOfFieldsWithReadAccess();
793
794      isPostProcessed = true;
795   }
796
797   std::map<int, StdSize>& CContext::getAttributesBufferSize()
798   {
799     std::map<int, StdSize> attributesSize;
800
801     size_t numEnabledFiles = this->enabledFiles.size();
802     for (size_t i = 0; i < numEnabledFiles; ++i)
803     {
804       CFile* file = this->enabledFiles[i];
805
806       std::vector<CField*> enabledFields = file->getEnabledFields();
807       size_t numEnabledFields = enabledFields.size();
808       for (size_t j = 0; j < numEnabledFields; ++j)
809       {
810         const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
811         std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
812         for (; it != itE; ++it)
813         {
814           // If attributesSize[it->first] does not exist, it will be zero-initialized
815           // so we can use it safely without checking for its existance
816           if (attributesSize[it->first] < it->second)
817             attributesSize[it->first] = it->second;
818         }
819       }
820     }
821
822     return attributesSize;
823   }
824
825   std::map<int, StdSize>& CContext::getDataBufferSize()
826   {
827     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
828
829     std::map<int, StdSize> dataSize;
830
831     // Find all reference domain and axis of all active fields
832     size_t numEnabledFiles = this->enabledFiles.size();
833     for (size_t i = 0; i < numEnabledFiles; ++i)
834     {
835       CFile* file = this->enabledFiles[i];
836       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
837
838       if (fileMode == mode)
839       {
840         std::vector<CField*> enabledFields = file->getEnabledFields();
841         size_t numEnabledFields = enabledFields.size();
842         for (size_t j = 0; j < numEnabledFields; ++j)
843         {
844           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
845           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
846           for (; it != itE; ++it)
847           {
848             // If dataSize[it->first] does not exist, it will be zero-initialized
849             // so we can use it safely without checking for its existance
850             if (CXios::isOptPerformance)
851               dataSize[it->first] += it->second;
852             else if (dataSize[it->first] < it->second)
853               dataSize[it->first] = it->second;
854           }
855         }
856       }
857     }
858
859     return dataSize;
860   }
861
862   //! Client side: Send infomation of active files (files are enabled to write out)
863   void CContext::sendEnabledFiles()
864   {
865     int size = this->enabledFiles.size();
866
867     // In a context, each type has a root definition, e.g: axis, domain, field.
868     // Every object must be a child of one of these root definition. In this case
869     // all new file objects created on server must be children of the root "file_definition"
870     StdString fileDefRoot("file_definition");
871     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
872
873     for (int i = 0; i < size; ++i)
874     {
875       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
876       this->enabledFiles[i]->sendAllAttributesToServer();
877       this->enabledFiles[i]->sendAddAllVariables();
878     }
879   }
880
881   //! Client side: Send information of active fields (ones are written onto files)
882   void CContext::sendEnabledFields()
883   {
884     int size = this->enabledFiles.size();
885     for (int i = 0; i < size; ++i)
886     {
887       this->enabledFiles[i]->sendEnabledFields();
888     }
889   }
890
891   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
892   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
893   {
894     if (!hasClient) return;
895
896     const vector<CAxis*> allAxis = CAxis::getAll();
897     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
898       (*it)->checkEligibilityForCompressedOutput();
899
900     const vector<CDomain*> allDomains = CDomain::getAll();
901     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
902       (*it)->checkEligibilityForCompressedOutput();
903
904     const vector<CGrid*> allGrids = CGrid::getAll();
905     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
906       (*it)->checkEligibilityForCompressedOutput();
907   }
908
909   //! Client side: Prepare the timeseries by adding the necessary files
910   void CContext::prepareTimeseries()
911   {
912     if (!hasClient) return;
913
914     const std::vector<CFile*> allFiles = CFile::getAll();
915     for (size_t i = 0; i < allFiles.size(); i++)
916     {
917       CFile* file = allFiles[i];
918
919       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
920       {
921         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : (!file->name.isEmpty() ? file->name : file->getId());
922
923         const std::vector<CField*> allFields = file->getAllFields();
924         for (size_t j = 0; j < allFields.size(); j++)
925         {
926           CField* field = allFields[j];
927
928           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
929           {
930             CFile* tsFile = CFile::create();
931             tsFile->duplicateAttributes(file);
932
933             tsFile->name = tsPrefix + "_";
934             if (!field->name.isEmpty())
935               tsFile->name.get() += field->name;
936             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
937               tsFile->name.get() += field->field_ref;
938             else
939               tsFile->name.get() += field->getId();
940
941             if (!field->ts_split_freq.isEmpty())
942               tsFile->split_freq = field->ts_split_freq;
943
944             CField* tsField = tsFile->addField();
945             tsField->field_ref = field->getId();
946
947             tsFile->solveFieldRefInheritance(true);
948
949             if (file->timeseries == CFile::timeseries_attr::exclusive)
950               field->enabled = false;
951           }
952         }
953
954         // Finally disable the original file is need be
955         if (file->timeseries == CFile::timeseries_attr::only)
956          file->enabled = false;
957       }
958     }
959   }
960
961   //! Client side: Send information of reference grid of active fields
962   void CContext::sendRefGrid()
963   {
964     std::set<StdString> gridIds;
965     int sizeFile = this->enabledFiles.size();
966     CFile* filePtr(NULL);
967
968     // Firstly, find all reference grids of all active fields
969     for (int i = 0; i < sizeFile; ++i)
970     {
971       filePtr = this->enabledFiles[i];
972       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
973       int sizeField = enabledFields.size();
974       for (int numField = 0; numField < sizeField; ++numField)
975       {
976         if (0 != enabledFields[numField]->getRelGrid())
977           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
978       }
979     }
980
981     // Create all reference grids on server side
982     StdString gridDefRoot("grid_definition");
983     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
984     std::set<StdString>::const_iterator it, itE = gridIds.end();
985     for (it = gridIds.begin(); it != itE; ++it)
986     {
987       gridPtr->sendCreateChild(*it);
988       CGrid::get(*it)->sendAllAttributesToServer();
989       CGrid::get(*it)->sendAllDomains();
990       CGrid::get(*it)->sendAllAxis();
991     }
992   }
993
994
995   //! Client side: Send information of reference domain and axis of active fields
996   void CContext::sendRefDomainsAxis()
997   {
998     std::set<StdString> domainIds;
999     std::set<StdString> axisIds;
1000
1001     // Find all reference domain and axis of all active fields
1002     int numEnabledFiles = this->enabledFiles.size();
1003     for (int i = 0; i < numEnabledFiles; ++i)
1004     {
1005       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1006       int numEnabledFields = enabledFields.size();
1007       for (int j = 0; j < numEnabledFields; ++j)
1008       {
1009         const std::pair<StdString, StdString>& prDomAxisId = enabledFields[j]->getRefDomainAxisIds();
1010         if ("" != prDomAxisId.first) domainIds.insert(prDomAxisId.first);
1011         if ("" != prDomAxisId.second) axisIds.insert(prDomAxisId.second);
1012       }
1013     }
1014
1015     // Create all reference axis on server side
1016     std::set<StdString>::iterator itDom, itAxis;
1017     std::set<StdString>::const_iterator itE;
1018
1019     StdString axiDefRoot("axis_definition");
1020     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1021     itE = axisIds.end();
1022     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1023     {
1024       if (!itAxis->empty())
1025       {
1026         axisPtr->sendCreateChild(*itAxis);
1027         CAxis::get(*itAxis)->sendAllAttributesToServer();
1028       }
1029     }
1030
1031     // Create all reference domains on server side
1032     StdString domDefRoot("domain_definition");
1033     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1034     itE = domainIds.end();
1035     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1036     {
1037       if (!itDom->empty()) {
1038          domPtr->sendCreateChild(*itDom);
1039          CDomain::get(*itDom)->sendAllAttributesToServer();
1040       }
1041     }
1042   }
1043
1044   //! Update calendar in each time step
1045   void CContext::updateCalendar(int step)
1046   {
1047      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1048      calendar->update(step);
1049      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1050
1051      if (hasClient)
1052      {
1053        checkPrefetchingOfEnabledReadModeFiles();
1054        garbageCollector.invalidate(calendar->getCurrentDate());
1055      }
1056   }
1057
1058   //! Server side: Create header of netcdf file
1059   void CContext::createFileHeader(void )
1060   {
1061      vector<CFile*>::const_iterator it;
1062
1063      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1064      {
1065         (*it)->initFile();
1066      }
1067   }
1068
1069   //! Get current context
1070   CContext* CContext::getCurrent(void)
1071   {
1072     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1073   }
1074
1075   /*!
1076   \brief Set context with an id be the current context
1077   \param [in] id identity of context to be set to current
1078   */
1079   void CContext::setCurrent(const string& id)
1080   {
1081     CObjectFactory::SetCurrentContextId(id);
1082     CGroupFactory::SetCurrentContextId(id);
1083   }
1084
1085  /*!
1086  \brief Create a context with specific id
1087  \param [in] id identity of new context
1088  \return pointer to the new context or already-existed one with identity id
1089  */
1090  CContext* CContext::create(const StdString& id)
1091  {
1092    CContext::setCurrent(id);
1093
1094    bool hasctxt = CContext::has(id);
1095    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1096    getRoot();
1097    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1098
1099#define DECLARE_NODE(Name_, name_) \
1100    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1101#define DECLARE_NODE_PAR(Name_, name_)
1102#include "node_type.conf"
1103
1104    return (context);
1105  }
1106
1107
1108
1109     //! Server side: Receive a message to do some post processing
1110  void CContext::recvRegistry(CEventServer& event)
1111  {
1112    CBufferIn* buffer=event.subEvents.begin()->buffer;
1113    string id;
1114    *buffer>>id;
1115    get(id)->recvRegistry(*buffer);
1116  }
1117
1118  void CContext::recvRegistry(CBufferIn& buffer)
1119  {
1120    if (server->intraCommRank==0)
1121    {
1122      CRegistry registry(server->intraComm) ;
1123      registry.fromBuffer(buffer) ;
1124      registryOut->mergeRegistry(registry) ;
1125    }
1126  }
1127
1128  void CContext::sendRegistry(void)
1129  {
1130    registryOut->hierarchicalGatherRegistry() ;
1131
1132    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1133    if (client->isServerLeader())
1134    {
1135       CMessage msg ;
1136       msg<<this->getIdServer();
1137       if (client->clientRank==0) msg<<*registryOut ;
1138       const std::list<int>& ranks = client->getRanksServerLeader();
1139       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1140         event.push(*itRank,1,msg);
1141       client->sendEvent(event);
1142     }
1143     else client->sendEvent(event);
1144  }
1145
1146} // namespace xios
Note: See TracBrowser for help on using the repository browser.