source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1053

Last change on this file since 1053 was 1053, checked in by yushan, 6 years ago

ep_lib namespace specified when netcdf involved

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 42.3 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     ep_lib::MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> maxEventSize;
277     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
278     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
279
280     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
281     for (it = dataBufferSize.begin(); it != ite; ++it)
282       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
283
284     ite = bufferSize.end();
285     for (it = bufferSize.begin(); it != ite; ++it)
286     {
287       it->second *= CXios::bufferSizeFactor;
288       if (it->second < minBufferSize) it->second = minBufferSize;
289     }
290
291     // We consider that the minimum buffer size is also the minimum event size
292     ite = maxEventSize.end();
293     for (it = maxEventSize.begin(); it != ite; ++it)
294       if (it->second < minBufferSize) it->second = minBufferSize;
295
296     if (client->isServerLeader())
297     {
298       const std::list<int>& ranks = client->getRanksServerLeader();
299       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
300         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
301     }
302
303     client->setBufferSize(bufferSize, maxEventSize);
304   }
305
306   //! Verify whether a context is initialized
307   bool CContext::isInitialized(void)
308   {
309     return hasClient;
310   }
311
312   //! Initialize server
313   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
314   {
315     hasServer=true;
316     server = new CContextServer(this,intraComm,interComm);
317
318     registryIn=new CRegistry(intraComm);
319     registryIn->setPath(getId()) ;
320     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
321     registryIn->bcastRegistry() ;
322     registryOut=new CRegistry(intraComm) ;
323     registryOut->setPath(getId()) ;
324
325     ep_lib::MPI_Comm intraCommClient, interCommClient;
326     if (cxtClient) // Attached mode
327     {
328       intraCommClient = intraComm;
329       interCommClient = interComm;
330     }
331     else
332     {
333       MPI_Comm_dup(intraComm, &intraCommClient);
334       comms.push_back(intraCommClient);
335       MPI_Comm_dup(interComm, &interCommClient);
336       comms.push_back(interCommClient);
337     }
338     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
339   }
340
341   //! Server side: Put server into a loop in order to listen message from client
342   bool CContext::eventLoop(void)
343   {
344     return server->eventLoop();
345   }
346
347   //! Try to send the buffers and receive possible answers
348   bool CContext::checkBuffersAndListen(void)
349   {
350     client->checkBuffers();
351     return server->eventLoop();
352   }
353
354   //! Terminate a context
355   void CContext::finalize(void)
356   {
357      if (!finalized)
358      {
359        finalized = true;
360        if (hasClient) sendRegistry() ;
361        client->finalize();
362        while (!server->hasFinished())
363        {
364          server->eventLoop();
365        }
366
367        if (hasServer)
368        {
369          closeAllFile();
370          registryOut->hierarchicalGatherRegistry() ;
371          //registryOut->gatherRegistry() ;
372          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
373        }
374
375        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
376          MPI_Comm_free(&(*it));
377        comms.clear();
378      }
379   }
380
381   /*!
382   \brief Close all the context defintion and do processing data
383      After everything is well defined on client side, they will be processed and sent to server
384   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
385   all necessary information to server, from which each server can build its own database.
386   Because the role of server is to write out field data on a specific netcdf file,
387   the only information that it needs is the enabled files
388   and the active fields (fields will be written onto active files)
389   */
390   void CContext::closeDefinition(void)
391   {
392     int myRank;
393     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
394
395     //printf("myRank = %d, hasClient = %d, hasServer = %d\n", myRank, hasClient, hasServer);
396
397     // There is nothing client need to send to server
398     if (hasClient)
399     {
400       // After xml is parsed, there are some more works with post processing
401       postProcessing(); 
402       //printf("myRank = %d,                postProcessing OK\n", myRank);
403     }
404     setClientServerBuffer(); //printf("myRank = %d, setClientServerBuffer OK\n", myRank);
405
406     if (hasClient && !hasServer)
407     {
408      // Send all attributes of current context to server
409      this->sendAllAttributesToServer(); //printf("myRank = %d, this->sendAllAttributesToServer OK\n", myRank);
410
411      // Send all attributes of current calendar
412      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
413      //printf("myRank = %d, CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer OK\n", myRank);
414
415      // We have enough information to send to server
416      // First of all, send all enabled files
417       sendEnabledFiles();  //printf("myRank = %d, sendEnabledFiles OK\n", myRank);
418
419      // Then, send all enabled fields
420       sendEnabledFields();  //printf("myRank = %d, sendEnabledFields OK\n", myRank);
421
422      // At last, we have all info of domain and axis, then send them
423       sendRefDomainsAxis();  //printf("myRank = %d, sendRefDomainsAxis OK\n", myRank);
424
425      // After that, send all grid (if any)
426       sendRefGrid(); //printf("myRank = %d, sendRefGrid OK\n", myRank);
427    }
428
429    // We have a xml tree on the server side and now, it should be also processed
430    if (hasClient && !hasServer) sendPostProcessing(); 
431
432    // There are some processings that should be done after all of above. For example: check mask or index
433    if (hasClient)
434    {
435      this->buildFilterGraphOfEnabledFields();  //printf("myRank = %d, buildFilterGraphOfEnabledFields OK\n", myRank);
436      buildFilterGraphOfFieldsWithReadAccess();  //printf("myRank = %d, buildFilterGraphOfFieldsWithReadAccess OK\n", myRank);
437      this->solveAllRefOfEnabledFields(true);  //printf("myRank = %d, solveAllRefOfEnabledFields OK\n", myRank);
438    }
439
440    // Now tell server that it can process all messages from client
441    if (hasClient && !hasServer) this->sendCloseDefinition();
442
443    // Nettoyage de l'arborescence
444    if (hasClient && !hasServer) CleanTree();
445
446    if (hasClient)
447    {
448      sendCreateFileHeader();  //printf("myRank = %d, sendCreateFileHeader OK\n", myRank);
449
450      startPrefetchingOfEnabledReadModeFiles();  //printf("myRank = %d, startPrefetchingOfEnabledReadModeFiles OK\n", myRank);
451    }
452   }
453
454   void CContext::findAllEnabledFields(void)
455   {
456     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
457     (void)this->enabledFiles[i]->getEnabledFields();
458   }
459
460   void CContext::findAllEnabledFieldsInReadModeFiles(void)
461   {
462     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
463     (void)this->enabledReadModeFiles[i]->getEnabledFields();
464   }
465
466   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
467   {
468      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
469        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
470   }
471
472   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
473   {
474     int size = this->enabledFiles.size();
475     for (int i = 0; i < size; ++i)
476     {
477       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
478     }
479
480     for (int i = 0; i < size; ++i)
481     {
482       this->enabledFiles[i]->generateNewTransformationGridDest();
483     }
484   }
485
486   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
487   {
488     int size = this->enabledFiles.size();
489     
490     for (int i = 0; i < size; ++i)
491     {
492       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
493     }
494   }
495
496   void CContext::buildFilterGraphOfEnabledFields()
497   {
498     int size = this->enabledFiles.size();
499     for (int i = 0; i < size; ++i)
500     {
501       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
502     }
503   }
504
505   void CContext::startPrefetchingOfEnabledReadModeFiles()
506   {
507     int size = enabledReadModeFiles.size();
508     for (int i = 0; i < size; ++i)
509     {
510        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
511     }
512   }
513
514   void CContext::checkPrefetchingOfEnabledReadModeFiles()
515   {
516     int size = enabledReadModeFiles.size();
517     for (int i = 0; i < size; ++i)
518     {
519        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
520     }
521   }
522
523  void CContext::findFieldsWithReadAccess(void)
524  {
525    fieldsWithReadAccess.clear();
526    const vector<CField*> allFields = CField::getAll();
527    for (size_t i = 0; i < allFields.size(); ++i)
528    {
529      CField* field = allFields[i];
530
531      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
532        field->read_access = true;
533      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
534        fieldsWithReadAccess.push_back(field);
535    }
536  }
537
538  void CContext::solveAllRefOfFieldsWithReadAccess()
539  {
540    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
541      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
542  }
543
544  void CContext::buildFilterGraphOfFieldsWithReadAccess()
545  {
546    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
547      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
548  }
549
550   void CContext::solveAllInheritance(bool apply)
551   {
552     // Résolution des héritages descendants (càd des héritages de groupes)
553     // pour chacun des contextes.
554      solveDescInheritance(apply);
555
556     // Résolution des héritages par référence au niveau des fichiers.
557      const vector<CFile*> allFiles=CFile::getAll();
558      const vector<CGrid*> allGrids= CGrid::getAll();
559
560     //if (hasClient && !hasServer)
561      if (hasClient)
562      {
563        for (unsigned int i = 0; i < allFiles.size(); i++)
564          allFiles[i]->solveFieldRefInheritance(apply);
565      }
566
567      unsigned int vecSize = allGrids.size();
568      unsigned int i = 0;
569      for (i = 0; i < vecSize; ++i)
570        allGrids[i]->solveDomainAxisRefInheritance(apply);
571
572   }
573
574   void CContext::findEnabledFiles(void)
575   {
576      const std::vector<CFile*> allFiles = CFile::getAll();
577
578      for (unsigned int i = 0; i < allFiles.size(); i++)
579         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
580         {
581            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
582               enabledFiles.push_back(allFiles[i]);
583         }
584         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
585
586
587      if (enabledFiles.size() == 0)
588         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
589               << getId() << "\" !");
590   }
591
592   void CContext::findEnabledReadModeFiles(void)
593   {
594     int size = this->enabledFiles.size();
595     for (int i = 0; i < size; ++i)
596     {
597       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
598        enabledReadModeFiles.push_back(enabledFiles[i]);
599     }
600   }
601
602   void CContext::closeAllFile(void)
603   {
604     std::vector<CFile*>::const_iterator
605            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
606
607     for (; it != end; it++)
608     {
609       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
610       (*it)->close();
611     }
612   }
613
614   /*!
615   \brief Dispatch event received from client
616      Whenever a message is received in buffer of server, it will be processed depending on
617   its event type. A new event type should be added in the switch list to make sure
618   it processed on server side.
619   \param [in] event: Received message
620   */
621   bool CContext::dispatchEvent(CEventServer& event)
622   {
623
624      if (SuperClass::dispatchEvent(event)) return true;
625      else
626      {
627        switch(event.type)
628        {
629           case EVENT_ID_CLOSE_DEFINITION :
630             recvCloseDefinition(event);
631             return true;
632             break;
633           case EVENT_ID_UPDATE_CALENDAR:
634             recvUpdateCalendar(event);
635             return true;
636             break;
637           case EVENT_ID_CREATE_FILE_HEADER :
638             recvCreateFileHeader(event);
639             return true;
640             break;
641           case EVENT_ID_POST_PROCESS:
642             recvPostProcessing(event);
643             return true;
644            case EVENT_ID_SEND_REGISTRY:
645             recvRegistry(event);
646             return true;
647            break;
648
649           default :
650             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
651                    <<"Unknown Event");
652           return false;
653         }
654      }
655   }
656
657   //! Client side: Send a message to server to make it close
658   void CContext::sendCloseDefinition(void)
659   {
660     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
661     if (client->isServerLeader())
662     {
663       CMessage msg;
664       msg<<this->getIdServer();
665       const std::list<int>& ranks = client->getRanksServerLeader();
666       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
667         event.push(*itRank,1,msg);
668       client->sendEvent(event);
669     }
670     else client->sendEvent(event);
671   }
672
673   //! Server side: Receive a message of client announcing a context close
674   void CContext::recvCloseDefinition(CEventServer& event)
675   {
676
677      CBufferIn* buffer=event.subEvents.begin()->buffer;
678      string id;
679      *buffer>>id;
680      get(id)->closeDefinition();
681   }
682
683   //! Client side: Send a message to update calendar in each time step
684   void CContext::sendUpdateCalendar(int step)
685   {
686     if (!hasServer)
687     {
688       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
689       if (client->isServerLeader())
690       {
691         CMessage msg;
692         msg<<this->getIdServer()<<step;
693         const std::list<int>& ranks = client->getRanksServerLeader();
694         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
695           event.push(*itRank,1,msg);
696         client->sendEvent(event);
697       }
698       else client->sendEvent(event);
699     }
700   }
701
702   //! Server side: Receive a message of client annoucing calendar update
703   void CContext::recvUpdateCalendar(CEventServer& event)
704   {
705      CBufferIn* buffer=event.subEvents.begin()->buffer;
706      string id;
707      *buffer>>id;
708      get(id)->recvUpdateCalendar(*buffer);
709   }
710
711   //! Server side: Receive a message of client annoucing calendar update
712   void CContext::recvUpdateCalendar(CBufferIn& buffer)
713   {
714      int step;
715      buffer>>step;
716      updateCalendar(step);
717   }
718
719   //! Client side: Send a message to create header part of netcdf file
720   void CContext::sendCreateFileHeader(void)
721   {
722     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
723     if (client->isServerLeader())
724     {
725       CMessage msg;
726       msg<<this->getIdServer();
727       const std::list<int>& ranks = client->getRanksServerLeader();
728       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
729         event.push(*itRank,1,msg) ;
730       client->sendEvent(event);
731     }
732     else client->sendEvent(event);
733   }
734
735   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
736   void CContext::recvCreateFileHeader(CEventServer& event)
737   {
738      CBufferIn* buffer=event.subEvents.begin()->buffer;
739      string id;
740      *buffer>>id;
741      get(id)->recvCreateFileHeader(*buffer);
742   }
743
744   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
745   void CContext::recvCreateFileHeader(CBufferIn& buffer)
746   {
747      createFileHeader();
748   }
749
750   //! Client side: Send a message to do some post processing on server
751   void CContext::sendPostProcessing()
752   {
753     if (!hasServer)
754     {
755       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
756       if (client->isServerLeader())
757       {
758         CMessage msg;
759         msg<<this->getIdServer();
760         const std::list<int>& ranks = client->getRanksServerLeader();
761         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
762           event.push(*itRank,1,msg);
763         client->sendEvent(event);
764       }
765       else client->sendEvent(event);
766     }
767   }
768
769   //! Server side: Receive a message to do some post processing
770   void CContext::recvPostProcessing(CEventServer& event)
771   {
772      CBufferIn* buffer=event.subEvents.begin()->buffer;
773      string id;
774      *buffer>>id;
775      get(id)->recvPostProcessing(*buffer);
776   }
777
778   //! Server side: Receive a message to do some post processing
779   void CContext::recvPostProcessing(CBufferIn& buffer)
780   {
781      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
782      postProcessing();
783   }
784
785   const StdString& CContext::getIdServer()
786   {
787      if (hasClient)
788      {
789        idServer_ = this->getId();
790        idServer_ += "_server";
791        return idServer_;
792      }
793      if (hasServer) return (this->getId());
794   }
795
796   /*!
797   \brief Do some simple post processings after parsing xml file
798      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
799   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
800   which will be written out into netcdf files, are processed
801   */
802   void CContext::postProcessing()
803   {
804     int myRank;
805     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
806
807     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
808     if (isPostProcessed) return;
809
810      // Make sure the calendar was correctly created
811      if (!calendar)
812        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
813      else if (calendar->getTimeStep() == NoneDu)
814        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
815      // Calendar first update to set the current date equals to the start date
816      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
817
818      // Find all inheritance in xml structure
819      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
820
821      // Check if some axis, domains or grids are eligible to for compressed indexed output.
822      // Warning: This must be done after solving the inheritance and before the rest of post-processing
823      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
824
825      // Check if some automatic time series should be generated
826      // Warning: This must be done after solving the inheritance and before the rest of post-processing
827      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
828
829      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
830      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
831      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
832
833      // Find all enabled fields of each file
834      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
835      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
836
837     if (hasClient && !hasServer)
838     {
839      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
840      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
841     }
842
843      // Only search and rebuild all reference objects of enable fields, don't transform
844      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
845
846      // Search and rebuild all reference object of enabled fields
847      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
848
849      // Find all fields with read access from the public API
850      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
851      // and solve the all reference for them
852      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
853
854      isPostProcessed = true;
855   }
856
857   /*!
858    * Compute the required buffer size to send the attributes (mostly those grid related).
859    *
860    * \param maxEventSize [in/out] the size of the bigger event for each connected server
861    */
862   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
863   {
864     std::map<int, StdSize> attributesSize;
865
866     if (hasClient)
867     {
868       size_t numEnabledFiles = this->enabledFiles.size();
869       for (size_t i = 0; i < numEnabledFiles; ++i)
870       {
871         CFile* file = this->enabledFiles[i];
872
873         std::vector<CField*> enabledFields = file->getEnabledFields();
874         size_t numEnabledFields = enabledFields.size();
875         for (size_t j = 0; j < numEnabledFields; ++j)
876         {
877           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
878           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
879           for (; it != itE; ++it)
880           {
881             // If attributesSize[it->first] does not exist, it will be zero-initialized
882             // so we can use it safely without checking for its existance
883             if (attributesSize[it->first] < it->second)
884               attributesSize[it->first] = it->second;
885
886             if (maxEventSize[it->first] < it->second)
887               maxEventSize[it->first] = it->second;
888           }
889         }
890       }
891     }
892
893     return attributesSize;
894   }
895
896   /*!
897    * Compute the required buffer size to send the fields data.
898    *
899    * \param maxEventSize [in/out] the size of the bigger event for each connected server
900    */
901   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
902   {
903     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
904
905     std::map<int, StdSize> dataSize;
906
907     // Find all reference domain and axis of all active fields
908     size_t numEnabledFiles = this->enabledFiles.size();
909     for (size_t i = 0; i < numEnabledFiles; ++i)
910     {
911       CFile* file = this->enabledFiles[i];
912       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
913
914       if (fileMode == mode)
915       {
916         std::vector<CField*> enabledFields = file->getEnabledFields();
917         size_t numEnabledFields = enabledFields.size();
918         for (size_t j = 0; j < numEnabledFields; ++j)
919         {
920           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
921           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
922           for (; it != itE; ++it)
923           {
924             // If dataSize[it->first] does not exist, it will be zero-initialized
925             // so we can use it safely without checking for its existance
926             if (CXios::isOptPerformance)
927               dataSize[it->first] += it->second;
928             else if (dataSize[it->first] < it->second)
929               dataSize[it->first] = it->second;
930
931             if (maxEventSize[it->first] < it->second)
932               maxEventSize[it->first] = it->second;
933           }
934         }
935       }
936     }
937
938     return dataSize;
939   }
940
941   //! Client side: Send infomation of active files (files are enabled to write out)
942   void CContext::sendEnabledFiles()
943   {
944     int size = this->enabledFiles.size();
945
946     // In a context, each type has a root definition, e.g: axis, domain, field.
947     // Every object must be a child of one of these root definition. In this case
948     // all new file objects created on server must be children of the root "file_definition"
949     StdString fileDefRoot("file_definition");
950     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
951
952     for (int i = 0; i < size; ++i)
953     {
954       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
955       this->enabledFiles[i]->sendAllAttributesToServer();
956       this->enabledFiles[i]->sendAddAllVariables();
957     }
958   }
959
960   //! Client side: Send information of active fields (ones are written onto files)
961   void CContext::sendEnabledFields()
962   {
963     int size = this->enabledFiles.size();
964     for (int i = 0; i < size; ++i)
965     {
966       this->enabledFiles[i]->sendEnabledFields();
967     }
968   }
969
970   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
971   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
972   {
973     if (!hasClient) return;
974
975     const vector<CAxis*> allAxis = CAxis::getAll();
976     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
977       (*it)->checkEligibilityForCompressedOutput();
978
979     const vector<CDomain*> allDomains = CDomain::getAll();
980     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
981       (*it)->checkEligibilityForCompressedOutput();
982
983     const vector<CGrid*> allGrids = CGrid::getAll();
984     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
985       (*it)->checkEligibilityForCompressedOutput();
986   }
987
988   //! Client side: Prepare the timeseries by adding the necessary files
989   void CContext::prepareTimeseries()
990   {
991     if (!hasClient) return;
992
993     const std::vector<CFile*> allFiles = CFile::getAll();
994     for (size_t i = 0; i < allFiles.size(); i++)
995     {
996       CFile* file = allFiles[i];
997
998       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
999       {
1000         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1001
1002         const std::vector<CField*> allFields = file->getAllFields();
1003         for (size_t j = 0; j < allFields.size(); j++)
1004         {
1005           CField* field = allFields[j];
1006
1007           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1008           {
1009             CFile* tsFile = CFile::create();
1010             tsFile->duplicateAttributes(file);
1011             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1012
1013             tsFile->name = tsPrefix + "_";
1014             if (!field->name.isEmpty())
1015               tsFile->name.get() += field->name;
1016             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1017               tsFile->name.get() += field->field_ref;
1018             else
1019               tsFile->name.get() += field->getId();
1020
1021             if (!field->ts_split_freq.isEmpty())
1022               tsFile->split_freq = field->ts_split_freq;
1023
1024             CField* tsField = tsFile->addField();
1025             tsField->field_ref = field->getId();
1026             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1027
1028             tsFile->solveFieldRefInheritance(true);
1029
1030             if (file->timeseries == CFile::timeseries_attr::exclusive)
1031               field->enabled = false;
1032           }
1033         }
1034
1035         // Finally disable the original file is need be
1036         if (file->timeseries == CFile::timeseries_attr::only)
1037          file->enabled = false;
1038       }
1039     }
1040   }
1041
1042   //! Client side: Send information of reference grid of active fields
1043   void CContext::sendRefGrid()
1044   {
1045     std::set<StdString> gridIds;
1046     int sizeFile = this->enabledFiles.size();
1047     CFile* filePtr(NULL);
1048
1049     // Firstly, find all reference grids of all active fields
1050     for (int i = 0; i < sizeFile; ++i)
1051     {
1052       filePtr = this->enabledFiles[i];
1053       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1054       int sizeField = enabledFields.size();
1055       for (int numField = 0; numField < sizeField; ++numField)
1056       {
1057         if (0 != enabledFields[numField]->getRelGrid())
1058           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1059       }
1060     }
1061
1062     // Create all reference grids on server side
1063     StdString gridDefRoot("grid_definition");
1064     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1065     std::set<StdString>::const_iterator it, itE = gridIds.end();
1066     for (it = gridIds.begin(); it != itE; ++it)
1067     {
1068       gridPtr->sendCreateChild(*it);
1069       CGrid::get(*it)->sendAllAttributesToServer();
1070       CGrid::get(*it)->sendAllDomains();
1071       CGrid::get(*it)->sendAllAxis();
1072       CGrid::get(*it)->sendAllScalars();
1073     }
1074   }
1075
1076
1077   //! Client side: Send information of reference domain and axis of active fields
1078   void CContext::sendRefDomainsAxis()
1079   {
1080     std::set<StdString> domainIds, axisIds, scalarIds;
1081
1082     // Find all reference domain and axis of all active fields
1083     int numEnabledFiles = this->enabledFiles.size();
1084     for (int i = 0; i < numEnabledFiles; ++i)
1085     {
1086       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1087       int numEnabledFields = enabledFields.size();
1088       for (int j = 0; j < numEnabledFields; ++j)
1089       {
1090         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1091         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1092         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1093         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1094       }
1095     }
1096
1097     // Create all reference axis on server side
1098     std::set<StdString>::iterator itDom, itAxis, itScalar;
1099     std::set<StdString>::const_iterator itE;
1100
1101     StdString scalarDefRoot("scalar_definition");
1102     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1103     itE = scalarIds.end();
1104     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1105     {
1106       if (!itScalar->empty())
1107       {
1108         scalarPtr->sendCreateChild(*itScalar);
1109         CScalar::get(*itScalar)->sendAllAttributesToServer();
1110       }
1111     }
1112
1113     StdString axiDefRoot("axis_definition");
1114     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1115     itE = axisIds.end();
1116     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1117     {
1118       if (!itAxis->empty())
1119       {
1120         axisPtr->sendCreateChild(*itAxis);
1121         CAxis::get(*itAxis)->sendAllAttributesToServer();
1122       }
1123     }
1124
1125     // Create all reference domains on server side
1126     StdString domDefRoot("domain_definition");
1127     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1128     itE = domainIds.end();
1129     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1130     {
1131       if (!itDom->empty()) {
1132          domPtr->sendCreateChild(*itDom);
1133          CDomain::get(*itDom)->sendAllAttributesToServer();
1134       }
1135     }
1136   }
1137
1138   //! Update calendar in each time step
1139   void CContext::updateCalendar(int step)
1140   {
1141      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1142      calendar->update(step);
1143      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1144
1145      if (hasClient)
1146      {
1147        checkPrefetchingOfEnabledReadModeFiles();
1148        garbageCollector.invalidate(calendar->getCurrentDate());
1149      }
1150   }
1151
1152   //! Server side: Create header of netcdf file
1153   void CContext::createFileHeader(void )
1154   {
1155      vector<CFile*>::const_iterator it;
1156
1157      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1158      {
1159         (*it)->initFile();
1160      }
1161   }
1162
1163   //! Get current context
1164   CContext* CContext::getCurrent(void)
1165   {
1166     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1167   }
1168
1169   /*!
1170   \brief Set context with an id be the current context
1171   \param [in] id identity of context to be set to current
1172   */
1173   void CContext::setCurrent(const string& id)
1174   {
1175     CObjectFactory::SetCurrentContextId(id);
1176     CGroupFactory::SetCurrentContextId(id);
1177   }
1178
1179  /*!
1180  \brief Create a context with specific id
1181  \param [in] id identity of new context
1182  \return pointer to the new context or already-existed one with identity id
1183  */
1184  CContext* CContext::create(const StdString& id)
1185  {
1186    CContext::setCurrent(id);
1187
1188    bool hasctxt = CContext::has(id);
1189    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1190    getRoot();
1191    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1192
1193#define DECLARE_NODE(Name_, name_) \
1194    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1195#define DECLARE_NODE_PAR(Name_, name_)
1196#include "node_type.conf"
1197
1198    return (context);
1199  }
1200
1201
1202
1203     //! Server side: Receive a message to do some post processing
1204  void CContext::recvRegistry(CEventServer& event)
1205  {
1206    CBufferIn* buffer=event.subEvents.begin()->buffer;
1207    string id;
1208    *buffer>>id;
1209    get(id)->recvRegistry(*buffer);
1210  }
1211
1212  void CContext::recvRegistry(CBufferIn& buffer)
1213  {
1214    if (server->intraCommRank==0)
1215    {
1216      CRegistry registry(server->intraComm) ;
1217      registry.fromBuffer(buffer) ;
1218      registryOut->mergeRegistry(registry) ;
1219    }
1220  }
1221
1222  void CContext::sendRegistry(void)
1223  {
1224    registryOut->hierarchicalGatherRegistry() ;
1225
1226    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1227    if (client->isServerLeader())
1228    {
1229       CMessage msg ;
1230       msg<<this->getIdServer();
1231       if (client->clientRank==0) msg<<*registryOut ;
1232       const std::list<int>& ranks = client->getRanksServerLeader();
1233       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1234         event.push(*itRank,1,msg);
1235       client->sendEvent(event);
1236    }
1237    else client->sendEvent(event);
1238  }
1239
1240} // namespace xios
Note: See TracBrowser for help on using the repository browser.