source: XIOS/trunk/src/node/context.cpp @ 719

Last change on this file since 719 was 719, checked in by rlacroix, 9 years ago

Add a new configuration variable "min_buffer_size".

This allows the user to control the minimum buffer size.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 36.7 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , dataSize_(), idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , dataSize_(), idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t bufferSizeMin = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (bufferSizeMin < sizeof(C##Name_##Definition)) bufferSizeMin = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> bufferSize = getDataSize();
277     if (bufferSize.empty())
278     {
279       if (client->isServerLeader())
280       {
281         const std::list<int>& ranks = client->getRanksServerLeader();
282         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
283           bufferSize[*itRank] = bufferSizeMin;
284       }
285       else
286        return;
287     }
288     else
289     {
290       std::map<int, StdSize>::iterator it  = bufferSize.begin(),
291                                        ite = bufferSize.end();
292       for (; it != ite; ++it)
293         it->second = (it->second < bufferSizeMin) ? bufferSizeMin : it->second;
294     }
295
296     client->setBufferSize(bufferSize);
297   }
298
299   //! Verify whether a context is initialized
300   bool CContext::isInitialized(void)
301   {
302     return hasClient;
303   }
304
305   //! Initialize server
306   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
307   {
308     hasServer=true;
309     server = new CContextServer(this,intraComm,interComm);
310
311     registryIn=new CRegistry(intraComm);
312     registryIn->setPath(getId()) ;
313     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
314     registryIn->bcastRegistry() ;
315     registryOut=new CRegistry(intraComm) ;
316     registryOut->setPath(getId()) ;
317 
318     MPI_Comm intraCommClient, interCommClient;
319     if (cxtClient) // Attached mode
320     {
321       intraCommClient = intraComm;
322       interCommClient = interComm;
323     }
324     else
325     {
326       MPI_Comm_dup(intraComm, &intraCommClient);
327       comms.push_back(intraCommClient);
328       MPI_Comm_dup(interComm, &interCommClient);
329       comms.push_back(interCommClient);
330     }
331     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
332   }
333
334   //! Server side: Put server into a loop in order to listen message from client
335   bool CContext::eventLoop(void)
336   {
337     return server->eventLoop();
338   }
339
340   //! Try to send the buffers and receive possible answers
341   bool CContext::checkBuffersAndListen(void)
342   {
343     client->checkBuffers();
344     return server->eventLoop();
345   }
346
347   //! Terminate a context
348   void CContext::finalize(void)
349   {
350      if (!finalized)
351      {
352        finalized = true;
353        if (hasClient) sendRegistry() ;
354        client->finalize();
355        while (!server->hasFinished())
356        {
357          server->eventLoop();
358        }
359
360        if (hasServer)
361        {
362          closeAllFile();
363          registryOut->hierarchicalGatherRegistry() ;
364          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
365        }
366       
367        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
368          MPI_Comm_free(&(*it));
369        comms.clear();
370      }
371   }
372
373   /*!
374   \brief Close all the context defintion and do processing data
375      After everything is well defined on client side, they will be processed and sent to server
376   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
377   all necessary information to server, from which each server can build its own database.
378   Because the role of server is to write out field data on a specific netcdf file,
379   the only information that it needs is the enabled files
380   and the active fields (fields will be written onto active files)
381   */
382   void CContext::closeDefinition(void)
383   {
384     // There is nothing client need to send to server
385     if (hasClient)
386     {
387       // After xml is parsed, there are some more works with post processing
388       postProcessing();
389     }
390     setClientServerBuffer();
391
392     if (hasClient && !hasServer)
393     {
394      // Send all attributes of current context to server
395      this->sendAllAttributesToServer();
396
397      // Send all attributes of current calendar
398      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
399
400      // We have enough information to send to server
401      // First of all, send all enabled files
402       sendEnabledFiles();
403
404      // Then, send all enabled fields
405       sendEnabledFields();
406
407      // At last, we have all info of domain and axis, then send them
408       sendRefDomainsAxis();
409
410      // After that, send all grid (if any)
411       sendRefGrid();
412    }
413
414    // We have a xml tree on the server side and now, it should be also processed
415    if (hasClient && !hasServer) sendPostProcessing();
416
417    // There are some processings that should be done after all of above. For example: check mask or index
418    if (hasClient)
419    {
420      this->buildFilterGraphOfEnabledFields();
421      buildFilterGraphOfFieldsWithReadAccess();
422      this->solveAllRefOfEnabledFields(true);
423    }
424
425    // Now tell server that it can process all messages from client
426    if (hasClient && !hasServer) this->sendCloseDefinition();
427
428    // Nettoyage de l'arborescence
429    if (hasClient && !hasServer) CleanTree(); // Only on client side??
430
431    if (hasClient)
432    {
433      sendCreateFileHeader();
434
435      startPrefetchingOfEnabledReadModeFiles();
436    }
437   }
438
439   void CContext::findAllEnabledFields(void)
440   {
441     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
442     (void)this->enabledFiles[i]->getEnabledFields();
443   }
444
445   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
446   {
447     int size = this->enabledFiles.size();
448     for (int i = 0; i < size; ++i)
449     {
450       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
451     }
452   }
453
454   void CContext::buildFilterGraphOfEnabledFields()
455   {
456     int size = this->enabledFiles.size();
457     for (int i = 0; i < size; ++i)
458     {
459       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
460     }
461   }
462
463   void CContext::startPrefetchingOfEnabledReadModeFiles()
464   {
465     int size = enabledReadModeFiles.size();
466     for (int i = 0; i < size; ++i)
467     {
468        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
469     }
470   }
471
472   void CContext::checkPrefetchingOfEnabledReadModeFiles()
473   {
474     int size = enabledReadModeFiles.size();
475     for (int i = 0; i < size; ++i)
476     {
477        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
478     }
479   }
480
481  void CContext::findFieldsWithReadAccess(void)
482  {
483    fieldsWithReadAccess.clear();
484    const vector<CField*> allFields = CField::getAll();
485    for (size_t i = 0; i < allFields.size(); ++i)
486    {
487      if (allFields[i]->file && !allFields[i]->file->mode.isEmpty() && allFields[i]->file->mode.getValue() == CFile::mode_attr::read)
488        allFields[i]->read_access = true;
489      if (!allFields[i]->read_access.isEmpty() && allFields[i]->read_access.getValue())
490        fieldsWithReadAccess.push_back(allFields[i]);
491    }
492  }
493
494  void CContext::solveAllRefOfFieldsWithReadAccess()
495  {
496    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
497      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
498  }
499
500  void CContext::buildFilterGraphOfFieldsWithReadAccess()
501  {
502    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
503      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
504  }
505
506   void CContext::solveAllInheritance(bool apply)
507   {
508     // Résolution des héritages descendants (càd des héritages de groupes)
509     // pour chacun des contextes.
510      solveDescInheritance(apply);
511
512     // Résolution des héritages par référence au niveau des fichiers.
513      const vector<CFile*> allFiles=CFile::getAll();
514      const vector<CGrid*> allGrids= CGrid::getAll();
515
516     //if (hasClient && !hasServer)
517      if (hasClient)
518      {
519        for (unsigned int i = 0; i < allFiles.size(); i++)
520          allFiles[i]->solveFieldRefInheritance(apply);
521      }
522
523      unsigned int vecSize = allGrids.size();
524      unsigned int i = 0;
525      for (i = 0; i < vecSize; ++i)
526        allGrids[i]->solveDomainAxisRefInheritance(apply);
527
528   }
529
530   void CContext::findEnabledFiles(void)
531   {
532      const std::vector<CFile*> allFiles = CFile::getAll();
533
534      for (unsigned int i = 0; i < allFiles.size(); i++)
535         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
536         {
537            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
538               enabledFiles.push_back(allFiles[i]);
539         }
540         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
541
542
543      if (enabledFiles.size() == 0)
544         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
545               << getId() << "\" !");
546   }
547
548   void CContext::findEnabledReadModeFiles(void)
549   {
550     int size = this->enabledFiles.size();
551     for (int i = 0; i < size; ++i)
552     {
553       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
554        enabledReadModeFiles.push_back(enabledFiles[i]);
555     }
556   }
557
558   void CContext::closeAllFile(void)
559   {
560     std::vector<CFile*>::const_iterator
561            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
562
563     for (; it != end; it++)
564     {
565       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
566       (*it)->close();
567     }
568   }
569
570   /*!
571   \brief Dispatch event received from client
572      Whenever a message is received in buffer of server, it will be processed depending on
573   its event type. A new event type should be added in the switch list to make sure
574   it processed on server side.
575   \param [in] event: Received message
576   */
577   bool CContext::dispatchEvent(CEventServer& event)
578   {
579
580      if (SuperClass::dispatchEvent(event)) return true;
581      else
582      {
583        switch(event.type)
584        {
585           case EVENT_ID_CLOSE_DEFINITION :
586             recvCloseDefinition(event);
587             return true;
588             break;
589           case EVENT_ID_UPDATE_CALENDAR:
590             recvUpdateCalendar(event);
591             return true;
592             break;
593           case EVENT_ID_CREATE_FILE_HEADER :
594             recvCreateFileHeader(event);
595             return true;
596             break;
597           case EVENT_ID_POST_PROCESS:
598             recvPostProcessing(event);
599             return true;
600            case EVENT_ID_SEND_REGISTRY:
601             recvRegistry(event);
602             return true;
603            break;
604
605           default :
606             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
607                    <<"Unknown Event");
608           return false;
609         }
610      }
611   }
612
613   //! Client side: Send a message to server to make it close
614   void CContext::sendCloseDefinition(void)
615   {
616     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
617     if (client->isServerLeader())
618     {
619       CMessage msg;
620       msg<<this->getIdServer();
621       const std::list<int>& ranks = client->getRanksServerLeader();
622       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
623         event.push(*itRank,1,msg);
624       client->sendEvent(event);
625     }
626     else client->sendEvent(event);
627   }
628
629   //! Server side: Receive a message of client announcing a context close
630   void CContext::recvCloseDefinition(CEventServer& event)
631   {
632
633      CBufferIn* buffer=event.subEvents.begin()->buffer;
634      string id;
635      *buffer>>id;
636      get(id)->closeDefinition();
637   }
638
639   //! Client side: Send a message to update calendar in each time step
640   void CContext::sendUpdateCalendar(int step)
641   {
642     if (!hasServer)
643     {
644       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
645       if (client->isServerLeader())
646       {
647         CMessage msg;
648         msg<<this->getIdServer()<<step;
649         const std::list<int>& ranks = client->getRanksServerLeader();
650         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
651           event.push(*itRank,1,msg);
652         client->sendEvent(event);
653       }
654       else client->sendEvent(event);
655     }
656   }
657
658   //! Server side: Receive a message of client annoucing calendar update
659   void CContext::recvUpdateCalendar(CEventServer& event)
660   {
661      CBufferIn* buffer=event.subEvents.begin()->buffer;
662      string id;
663      *buffer>>id;
664      get(id)->recvUpdateCalendar(*buffer);
665   }
666
667   //! Server side: Receive a message of client annoucing calendar update
668   void CContext::recvUpdateCalendar(CBufferIn& buffer)
669   {
670      int step;
671      buffer>>step;
672      updateCalendar(step);
673   }
674
675   //! Client side: Send a message to create header part of netcdf file
676   void CContext::sendCreateFileHeader(void)
677   {
678     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
679     if (client->isServerLeader())
680     {
681       CMessage msg;
682       msg<<this->getIdServer();
683       const std::list<int>& ranks = client->getRanksServerLeader();
684       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
685         event.push(*itRank,1,msg) ;
686       client->sendEvent(event);
687     }
688     else client->sendEvent(event);
689   }
690
691   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
692   void CContext::recvCreateFileHeader(CEventServer& event)
693   {
694      CBufferIn* buffer=event.subEvents.begin()->buffer;
695      string id;
696      *buffer>>id;
697      get(id)->recvCreateFileHeader(*buffer);
698   }
699
700   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
701   void CContext::recvCreateFileHeader(CBufferIn& buffer)
702   {
703      createFileHeader();
704   }
705
706   //! Client side: Send a message to do some post processing on server
707   void CContext::sendPostProcessing()
708   {
709     if (!hasServer)
710     {
711       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
712       if (client->isServerLeader())
713       {
714         CMessage msg;
715         msg<<this->getIdServer();
716         const std::list<int>& ranks = client->getRanksServerLeader();
717         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
718           event.push(*itRank,1,msg);
719         client->sendEvent(event);
720       }
721       else client->sendEvent(event);
722     }
723   }
724
725   //! Server side: Receive a message to do some post processing
726   void CContext::recvPostProcessing(CEventServer& event)
727   {
728      CBufferIn* buffer=event.subEvents.begin()->buffer;
729      string id;
730      *buffer>>id;
731      get(id)->recvPostProcessing(*buffer);
732   }
733
734   //! Server side: Receive a message to do some post processing
735   void CContext::recvPostProcessing(CBufferIn& buffer)
736   {
737      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
738      postProcessing();
739   }
740
741   const StdString& CContext::getIdServer()
742   {
743      if (hasClient)
744      {
745        idServer_ = this->getId();
746        idServer_ += "_server";
747        return idServer_;
748      }
749      if (hasServer) return (this->getId());
750   }
751
752   /*!
753   \brief Do some simple post processings after parsing xml file
754      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
755   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
756   which will be written out into netcdf files, are processed
757   */
758   void CContext::postProcessing()
759   {
760     if (isPostProcessed) return;
761
762      // Make sure the calendar was correctly created
763      if (!calendar)
764        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
765      else if (calendar->getTimeStep() == NoneDu)
766        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
767      // Calendar first update to set the current date equals to the start date
768      calendar->update(0);
769
770      // Find all inheritance in xml structure
771      this->solveAllInheritance();
772
773      // Check if some axis, domains or grids are eligible to for compressed indexed output.
774      // Warning: This must be done after solving the inheritance and before the rest of post-processing
775      checkAxisDomainsGridsEligibilityForCompressedOutput();
776
777      // Check if some automatic time series should be generated
778      // Warning: This must be done after solving the inheritance and before the rest of post-processing
779      prepareTimeseries();
780
781      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
782      this->findEnabledFiles();
783      this->findEnabledReadModeFiles();
784
785      // Find all enabled fields of each file
786      this->findAllEnabledFields();
787
788      // Search and rebuild all reference object of enabled fields
789      this->solveAllRefOfEnabledFields(false);
790
791      // Find all fields with read access from the public API
792      findFieldsWithReadAccess();
793      // and solve the all reference for them
794      solveAllRefOfFieldsWithReadAccess();
795
796      isPostProcessed = true;
797   }
798
799   std::map<int, StdSize>& CContext::getDataSize()
800   {
801     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
802
803     // Set of grid used by enabled fields
804     std::set<StdString> usedGrid;
805
806     // Find all reference domain and axis of all active fields
807     int numEnabledFiles = this->enabledFiles.size();
808     for (int i = 0; i < numEnabledFiles; ++i)
809     {
810       CFile* file = this->enabledFiles[i];
811       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
812
813       if (fileMode == mode)
814       {
815         std::vector<CField*> enabledFields = file->getEnabledFields();
816         int numEnabledFields = enabledFields.size();
817         for (int j = 0; j < numEnabledFields; ++j)
818         {
819           StdString currentGrid = enabledFields[j]->grid->getId();
820           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataSize();
821           if (dataSize_.empty())
822           {
823             dataSize_ = mapSize;
824             usedGrid.insert(currentGrid);
825           }
826           else
827           {
828             std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
829             if (usedGrid.find(currentGrid) == usedGrid.end())
830             {
831               for (; it != itE; ++it)
832               {
833                 if (0 < dataSize_.count(it->first)) dataSize_[it->first] += it->second;
834                 else dataSize_.insert(make_pair(it->first, it->second));
835               }
836             } else
837             {
838               for (; it != itE; ++it)
839               {
840                 if (0 < dataSize_.count(it->first))
841                  if (CXios::isOptPerformance) dataSize_[it->first] += it->second;
842                  else
843                  {
844                    if (dataSize_[it->first] < it->second) dataSize_[it->first] = it->second;
845                  }
846                 else dataSize_.insert(make_pair(it->first, it->second));
847               }
848             }
849           }
850         }
851       }
852     }
853
854     return dataSize_;
855   }
856
857   //! Client side: Send infomation of active files (files are enabled to write out)
858   void CContext::sendEnabledFiles()
859   {
860     int size = this->enabledFiles.size();
861
862     // In a context, each type has a root definition, e.g: axis, domain, field.
863     // Every object must be a child of one of these root definition. In this case
864     // all new file objects created on server must be children of the root "file_definition"
865     StdString fileDefRoot("file_definition");
866     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
867
868     for (int i = 0; i < size; ++i)
869     {
870       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
871       this->enabledFiles[i]->sendAllAttributesToServer();
872       this->enabledFiles[i]->sendAddAllVariables();
873     }
874   }
875
876   //! Client side: Send information of active fields (ones are written onto files)
877   void CContext::sendEnabledFields()
878   {
879     int size = this->enabledFiles.size();
880     for (int i = 0; i < size; ++i)
881     {
882       this->enabledFiles[i]->sendEnabledFields();
883     }
884   }
885
886   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
887   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
888   {
889     if (!hasClient) return;
890
891     const vector<CAxis*> allAxis = CAxis::getAll();
892     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
893       (*it)->checkEligibilityForCompressedOutput();
894
895     const vector<CDomain*> allDomains = CDomain::getAll();
896     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
897       (*it)->checkEligibilityForCompressedOutput();
898
899     const vector<CGrid*> allGrids = CGrid::getAll();
900     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
901       (*it)->checkEligibilityForCompressedOutput();
902   }
903
904   //! Client side: Prepare the timeseries by adding the necessary files
905   void CContext::prepareTimeseries()
906   {
907     if (!hasClient) return;
908
909     const std::vector<CFile*> allFiles = CFile::getAll();
910     for (size_t i = 0; i < allFiles.size(); i++)
911     {
912       CFile* file = allFiles[i];
913
914       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
915       {
916         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : (!file->name.isEmpty() ? file->name : file->getId());
917
918         const std::vector<CField*> allFields = file->getAllFields();
919         for (size_t j = 0; j < allFields.size(); j++)
920         {
921           CField* field = allFields[j];
922
923           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
924           {
925             CFile* tsFile = CFile::create();
926             tsFile->duplicateAttributes(file);
927
928             tsFile->name = tsPrefix + "_";
929             if (!field->name.isEmpty())
930               tsFile->name.get() += field->name;
931             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
932               tsFile->name.get() += field->field_ref;
933             else
934               tsFile->name.get() += field->getId();
935
936             if (!field->ts_split_freq.isEmpty())
937               tsFile->split_freq = field->ts_split_freq;
938
939             CField* tsField = tsFile->addField();
940             tsField->field_ref = field->getId();
941
942             tsFile->solveFieldRefInheritance(true);
943
944             if (file->timeseries == CFile::timeseries_attr::exclusive)
945               field->enabled = false;
946           }
947         }
948
949         // Finally disable the original file is need be
950         if (file->timeseries == CFile::timeseries_attr::only)
951          file->enabled = false;
952       }
953     }
954   }
955
956   //! Client side: Send information of reference grid of active fields
957   void CContext::sendRefGrid()
958   {
959     std::set<StdString> gridIds;
960     int sizeFile = this->enabledFiles.size();
961     CFile* filePtr(NULL);
962
963     // Firstly, find all reference grids of all active fields
964     for (int i = 0; i < sizeFile; ++i)
965     {
966       filePtr = this->enabledFiles[i];
967       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
968       int sizeField = enabledFields.size();
969       for (int numField = 0; numField < sizeField; ++numField)
970       {
971         if (0 != enabledFields[numField]->getRelGrid())
972           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
973       }
974     }
975
976     // Create all reference grids on server side
977     StdString gridDefRoot("grid_definition");
978     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
979     std::set<StdString>::const_iterator it, itE = gridIds.end();
980     for (it = gridIds.begin(); it != itE; ++it)
981     {
982       gridPtr->sendCreateChild(*it);
983       CGrid::get(*it)->sendAllAttributesToServer();
984       CGrid::get(*it)->sendAllDomains();
985       CGrid::get(*it)->sendAllAxis();
986     }
987   }
988
989
990   //! Client side: Send information of reference domain and axis of active fields
991   void CContext::sendRefDomainsAxis()
992   {
993     std::set<StdString> domainIds;
994     std::set<StdString> axisIds;
995
996     // Find all reference domain and axis of all active fields
997     int numEnabledFiles = this->enabledFiles.size();
998     for (int i = 0; i < numEnabledFiles; ++i)
999     {
1000       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1001       int numEnabledFields = enabledFields.size();
1002       for (int j = 0; j < numEnabledFields; ++j)
1003       {
1004         const std::pair<StdString, StdString>& prDomAxisId = enabledFields[j]->getRefDomainAxisIds();
1005         if ("" != prDomAxisId.first) domainIds.insert(prDomAxisId.first);
1006         if ("" != prDomAxisId.second) axisIds.insert(prDomAxisId.second);
1007       }
1008     }
1009
1010     // Create all reference axis on server side
1011     std::set<StdString>::iterator itDom, itAxis;
1012     std::set<StdString>::const_iterator itE;
1013
1014     StdString axiDefRoot("axis_definition");
1015     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1016     itE = axisIds.end();
1017     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1018     {
1019       if (!itAxis->empty())
1020       {
1021         axisPtr->sendCreateChild(*itAxis);
1022         CAxis::get(*itAxis)->sendAllAttributesToServer();
1023       }
1024     }
1025
1026     // Create all reference domains on server side
1027     StdString domDefRoot("domain_definition");
1028     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1029     itE = domainIds.end();
1030     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1031     {
1032       if (!itDom->empty()) {
1033          domPtr->sendCreateChild(*itDom);
1034          CDomain::get(*itDom)->sendAllAttributesToServer();
1035       }
1036     }
1037   }
1038
1039   //! Update calendar in each time step
1040   void CContext::updateCalendar(int step)
1041   {
1042      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1043      calendar->update(step);
1044      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1045
1046      if (hasClient)
1047      {
1048        checkPrefetchingOfEnabledReadModeFiles();
1049        garbageCollector.invalidate(calendar->getCurrentDate());
1050      }
1051   }
1052
1053   //! Server side: Create header of netcdf file
1054   void CContext::createFileHeader(void )
1055   {
1056      vector<CFile*>::const_iterator it;
1057
1058      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1059      {
1060         (*it)->initFile();
1061      }
1062   }
1063
1064   //! Get current context
1065   CContext* CContext::getCurrent(void)
1066   {
1067     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1068   }
1069
1070   /*!
1071   \brief Set context with an id be the current context
1072   \param [in] id identity of context to be set to current
1073   */
1074   void CContext::setCurrent(const string& id)
1075   {
1076     CObjectFactory::SetCurrentContextId(id);
1077     CGroupFactory::SetCurrentContextId(id);
1078   }
1079
1080  /*!
1081  \brief Create a context with specific id
1082  \param [in] id identity of new context
1083  \return pointer to the new context or already-existed one with identity id
1084  */
1085  CContext* CContext::create(const StdString& id)
1086  {
1087    CContext::setCurrent(id);
1088
1089    bool hasctxt = CContext::has(id);
1090    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1091    getRoot();
1092    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1093
1094#define DECLARE_NODE(Name_, name_) \
1095    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1096#define DECLARE_NODE_PAR(Name_, name_)
1097#include "node_type.conf"
1098
1099    return (context);
1100  }
1101
1102
1103
1104     //! Server side: Receive a message to do some post processing
1105  void CContext::recvRegistry(CEventServer& event)
1106  {
1107    CBufferIn* buffer=event.subEvents.begin()->buffer;
1108    string id;
1109    *buffer>>id;
1110    get(id)->recvRegistry(*buffer);
1111  }
1112
1113  void CContext::recvRegistry(CBufferIn& buffer)
1114  {
1115    if (server->intraCommRank==0)
1116    {
1117      CRegistry registry(server->intraComm) ;
1118      registry.fromBuffer(buffer) ;
1119      registryOut->mergeRegistry(registry) ;
1120    }
1121  }
1122
1123  void CContext::sendRegistry(void)
1124  {
1125    registryOut->hierarchicalGatherRegistry() ;
1126
1127    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1128    if (client->isServerLeader())
1129    {
1130       CMessage msg ;
1131       msg<<this->getIdServer();
1132       if (client->clientRank==0) msg<<*registryOut ;
1133       const std::list<int>& ranks = client->getRanksServerLeader();
1134       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1135         event.push(*itRank,1,msg);
1136       client->sendEvent(event);
1137     }
1138     else client->sendEvent(event);
1139  }
1140
1141} // namespace xios
Note: See TracBrowser for help on using the repository browser.