source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1080

Last change on this file since 1080 was 1080, checked in by yushan, 7 years ago

Under development : save modification

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 42.5 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Dfinitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242
243     int tmp_rank;
244     MPI_Comm_rank(intraComm, &tmp_rank);
245     MPI_Barrier(intraComm);
246     
247     
248     
249     registryIn=new CRegistry(intraComm);
250
251     #pragma omp critical (_output)
252     printf("Client %d : registryIn=new CRegistry(intraComm), &registryIn = %p, registryIn = %p \n", tmp_rank, &registryIn, registryIn) ;
253
254     // registryIn=new CRegistry;
255     // registryIn->communicator = intraComm;
256     registryIn->setPath(getId()) ;
257     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
258     registryIn->bcastRegistry() ;
259
260     registryOut=new CRegistry(intraComm) ;
261     registryOut->setPath(getId()) ;
262     #pragma omp critical (_output)
263     printf("Client %d : registryOut->setPath(getId()) \n", tmp_rank) ;
264
265     ep_lib::MPI_Comm intraCommServer, interCommServer;
266     if (cxtServer) // Attached mode
267     {
268       intraCommServer = intraComm;
269       interCommServer = interComm;
270     }
271     else
272     {
273       MPI_Comm_dup(intraComm, &intraCommServer);
274       comms.push_back(intraCommServer);
275       MPI_Comm_dup(interComm, &interCommServer);
276       comms.push_back(interCommServer);
277       
278     }
279     server = new CContextServer(this,intraCommServer,interCommServer);
280   }
281
282   void CContext::setClientServerBuffer()
283   {
284     size_t minBufferSize = CXios::minBufferSize;
285#define DECLARE_NODE(Name_, name_)    \
286     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
287#define DECLARE_NODE_PAR(Name_, name_)
288#include "node_type.conf"
289#undef DECLARE_NODE
290#undef DECLARE_NODE_PAR
291
292     std::map<int, StdSize> maxEventSize;
293     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
294     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
295
296     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
297     for (it = dataBufferSize.begin(); it != ite; ++it)
298       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
299
300     ite = bufferSize.end();
301     for (it = bufferSize.begin(); it != ite; ++it)
302     {
303       it->second *= CXios::bufferSizeFactor;
304       if (it->second < minBufferSize) it->second = minBufferSize;
305     }
306
307     // We consider that the minimum buffer size is also the minimum event size
308     ite = maxEventSize.end();
309     for (it = maxEventSize.begin(); it != ite; ++it)
310       if (it->second < minBufferSize) it->second = minBufferSize;
311
312     if (client->isServerLeader())
313     {
314       const std::list<int>& ranks = client->getRanksServerLeader();
315       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
316         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
317     }
318
319     client->setBufferSize(bufferSize, maxEventSize);
320   }
321
322   //! Verify whether a context is initialized
323   bool CContext::isInitialized(void)
324   {
325     return hasClient;
326   }
327
328   //! Initialize server
329   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
330   {
331     hasServer=true;
332     server = new CContextServer(this,intraComm,interComm);
333
334     registryIn=new CRegistry(intraComm);
335     registryIn->setPath(getId()) ;
336     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
337     registryIn->bcastRegistry() ;
338     registryOut=new CRegistry(intraComm) ;
339     registryOut->setPath(getId()) ;
340
341     ep_lib::MPI_Comm intraCommClient, interCommClient;
342     if (cxtClient) // Attached mode
343     {
344       intraCommClient = intraComm;
345       interCommClient = interComm;
346     }
347     else
348     {
349       MPI_Comm_dup(intraComm, &intraCommClient);
350       comms.push_back(intraCommClient);
351       MPI_Comm_dup(interComm, &interCommClient);
352       comms.push_back(interCommClient);
353     }
354     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
355   }
356
357   //! Server side: Put server into a loop in order to listen message from client
358   bool CContext::eventLoop(void)
359   {
360     return server->eventLoop();
361   }
362
363   //! Try to send the buffers and receive possible answers
364   bool CContext::checkBuffersAndListen(void)
365   {
366     client->checkBuffers();
367     return server->eventLoop();
368   }
369
370   //! Terminate a context
371   void CContext::finalize(void)
372   {
373     
374      if (!finalized)
375      {
376        finalized = true;
377        if (hasClient) sendRegistry() ;
378        client->finalize();
379        while (!server->hasFinished())
380        {
381          server->eventLoop();
382        }
383
384        if (hasServer)
385        {
386          closeAllFile();
387          registryOut->hierarchicalGatherRegistry() ;
388          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
389        }
390
391        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
392          MPI_Comm_free(&(*it));
393        comms.clear();
394      }
395   }
396
397   /*!
398   \brief Close all the context defintion and do processing data
399      After everything is well defined on client side, they will be processed and sent to server
400   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
401   all necessary information to server, from which each server can build its own database.
402   Because the role of server is to write out field data on a specific netcdf file,
403   the only information that it needs is the enabled files
404   and the active fields (fields will be written onto active files)
405   */
406   void CContext::closeDefinition(void)
407   {
408
409     // There is nothing client need to send to server
410     if (hasClient)
411     {
412       // After xml is parsed, there are some more works with post processing
413       postProcessing(); 
414     }
415     setClientServerBuffer(); 
416
417     if (hasClient && !hasServer)
418     {
419      // Send all attributes of current context to server
420      this->sendAllAttributesToServer();
421
422      // Send all attributes of current calendar
423      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
424
425      // We have enough information to send to server
426      // First of all, send all enabled files
427       sendEnabledFiles(); 
428
429      // Then, send all enabled fields
430       sendEnabledFields(); 
431
432      // At last, we have all info of domain and axis, then send them
433       sendRefDomainsAxis(); 
434      // After that, send all grid (if any)
435       sendRefGrid(); 
436    }
437
438    // We have a xml tree on the server side and now, it should be also processed
439    if (hasClient && !hasServer) sendPostProcessing(); 
440
441    // There are some processings that should be done after all of above. For example: check mask or index
442    if (hasClient)
443    {
444      this->buildFilterGraphOfEnabledFields(); 
445      buildFilterGraphOfFieldsWithReadAccess(); 
446      this->solveAllRefOfEnabledFields(true); 
447    }
448
449    // Now tell server that it can process all messages from client
450    if (hasClient && !hasServer) this->sendCloseDefinition();
451
452    // Nettoyage de l'arborescence
453    if (hasClient && !hasServer) CleanTree();
454
455    if (hasClient)
456    {
457      sendCreateFileHeader(); 
458
459      startPrefetchingOfEnabledReadModeFiles(); 
460    }
461   }
462
463   void CContext::findAllEnabledFields(void)
464   {
465     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
466     (void)this->enabledFiles[i]->getEnabledFields();
467   }
468
469   void CContext::findAllEnabledFieldsInReadModeFiles(void)
470   {
471     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
472     (void)this->enabledReadModeFiles[i]->getEnabledFields();
473   }
474
475   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
476   {
477      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
478        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
479   }
480
481   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
482   {
483     int size = this->enabledFiles.size();
484     for (int i = 0; i < size; ++i)
485     {
486       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
487     }
488
489     for (int i = 0; i < size; ++i)
490     {
491       this->enabledFiles[i]->generateNewTransformationGridDest();
492     }
493   }
494
495   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
496   {
497     int size = this->enabledFiles.size();
498     
499     for (int i = 0; i < size; ++i)
500     {
501       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
502     }
503   }
504
505   void CContext::buildFilterGraphOfEnabledFields()
506   {
507     int size = this->enabledFiles.size();
508     for (int i = 0; i < size; ++i)
509     {
510       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
511     }
512   }
513
514   void CContext::startPrefetchingOfEnabledReadModeFiles()
515   {
516     int size = enabledReadModeFiles.size();
517     for (int i = 0; i < size; ++i)
518     {
519        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
520     }
521   }
522
523   void CContext::checkPrefetchingOfEnabledReadModeFiles()
524   {
525     int size = enabledReadModeFiles.size();
526     for (int i = 0; i < size; ++i)
527     {
528        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
529     }
530   }
531
532  void CContext::findFieldsWithReadAccess(void)
533  {
534    fieldsWithReadAccess.clear();
535    const vector<CField*> allFields = CField::getAll();
536    for (size_t i = 0; i < allFields.size(); ++i)
537    {
538      CField* field = allFields[i];
539
540      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
541        field->read_access = true;
542      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
543        fieldsWithReadAccess.push_back(field);
544    }
545  }
546
547  void CContext::solveAllRefOfFieldsWithReadAccess()
548  {
549    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
550      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
551  }
552
553  void CContext::buildFilterGraphOfFieldsWithReadAccess()
554  {
555    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
556      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
557  }
558
559   void CContext::solveAllInheritance(bool apply)
560   {
561     // Rsolution des hritages descendants (cd des hritages de groupes)
562     // pour chacun des contextes.
563      solveDescInheritance(apply);
564
565     // Rsolution des hritages par rfrence au niveau des fichiers.
566      const vector<CFile*> allFiles=CFile::getAll();
567      const vector<CGrid*> allGrids= CGrid::getAll();
568
569     //if (hasClient && !hasServer)
570      if (hasClient)
571      {
572        for (unsigned int i = 0; i < allFiles.size(); i++)
573          allFiles[i]->solveFieldRefInheritance(apply);
574      }
575
576      unsigned int vecSize = allGrids.size();
577      unsigned int i = 0;
578      for (i = 0; i < vecSize; ++i)
579        allGrids[i]->solveDomainAxisRefInheritance(apply);
580
581   }
582
583   void CContext::findEnabledFiles(void)
584   {
585      const std::vector<CFile*> allFiles = CFile::getAll();
586
587      for (unsigned int i = 0; i < allFiles.size(); i++)
588         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
589         {
590            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
591               enabledFiles.push_back(allFiles[i]);
592         }
593         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
594
595
596      if (enabledFiles.size() == 0)
597         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
598               << getId() << "\" !");
599   }
600
601   void CContext::findEnabledReadModeFiles(void)
602   {
603     int size = this->enabledFiles.size();
604     for (int i = 0; i < size; ++i)
605     {
606       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
607        enabledReadModeFiles.push_back(enabledFiles[i]);
608     }
609   }
610
611   void CContext::closeAllFile(void)
612   {
613     std::vector<CFile*>::const_iterator
614            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
615
616     for (; it != end; it++)
617     {
618       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
619       (*it)->close();
620     }
621   }
622
623   /*!
624   \brief Dispatch event received from client
625      Whenever a message is received in buffer of server, it will be processed depending on
626   its event type. A new event type should be added in the switch list to make sure
627   it processed on server side.
628   \param [in] event: Received message
629   */
630   bool CContext::dispatchEvent(CEventServer& event)
631   {
632
633      if (SuperClass::dispatchEvent(event)) return true;
634      else
635      {
636        switch(event.type)
637        {
638           case EVENT_ID_CLOSE_DEFINITION :
639             recvCloseDefinition(event);
640             return true;
641             break;
642           case EVENT_ID_UPDATE_CALENDAR:
643             recvUpdateCalendar(event);
644             return true;
645             break;
646           case EVENT_ID_CREATE_FILE_HEADER :
647             recvCreateFileHeader(event);
648             return true;
649             break;
650           case EVENT_ID_POST_PROCESS:
651             recvPostProcessing(event);
652             return true;
653            case EVENT_ID_SEND_REGISTRY:
654             recvRegistry(event);
655             return true;
656            break;
657
658           default :
659             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
660                    <<"Unknown Event");
661           return false;
662         }
663      }
664   }
665
666   //! Client side: Send a message to server to make it close
667   void CContext::sendCloseDefinition(void)
668   {
669     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
670     if (client->isServerLeader())
671     {
672       CMessage msg;
673       msg<<this->getIdServer();
674       const std::list<int>& ranks = client->getRanksServerLeader();
675       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
676         event.push(*itRank,1,msg);
677       client->sendEvent(event);
678     }
679     else client->sendEvent(event);
680   }
681
682   //! Server side: Receive a message of client announcing a context close
683   void CContext::recvCloseDefinition(CEventServer& event)
684   {
685
686      CBufferIn* buffer=event.subEvents.begin()->buffer;
687      string id;
688      *buffer>>id;
689      get(id)->closeDefinition();
690   }
691
692   //! Client side: Send a message to update calendar in each time step
693   void CContext::sendUpdateCalendar(int step)
694   {
695     if (!hasServer)
696     {
697       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
698       if (client->isServerLeader())
699       {
700         CMessage msg;
701         msg<<this->getIdServer()<<step;
702         const std::list<int>& ranks = client->getRanksServerLeader();
703         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
704           event.push(*itRank,1,msg);
705         client->sendEvent(event);
706       }
707       else client->sendEvent(event);
708     }
709   }
710
711   //! Server side: Receive a message of client annoucing calendar update
712   void CContext::recvUpdateCalendar(CEventServer& event)
713   {
714      CBufferIn* buffer=event.subEvents.begin()->buffer;
715      string id;
716      *buffer>>id;
717      get(id)->recvUpdateCalendar(*buffer);
718   }
719
720   //! Server side: Receive a message of client annoucing calendar update
721   void CContext::recvUpdateCalendar(CBufferIn& buffer)
722   {
723      int step;
724      buffer>>step;
725      updateCalendar(step);
726   }
727
728   //! Client side: Send a message to create header part of netcdf file
729   void CContext::sendCreateFileHeader(void)
730   {
731     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
732     if (client->isServerLeader())
733     {
734       CMessage msg;
735       msg<<this->getIdServer();
736       const std::list<int>& ranks = client->getRanksServerLeader();
737       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
738         event.push(*itRank,1,msg) ;
739       client->sendEvent(event);
740     }
741     else client->sendEvent(event);
742   }
743
744   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
745   void CContext::recvCreateFileHeader(CEventServer& event)
746   {
747      CBufferIn* buffer=event.subEvents.begin()->buffer;
748      string id;
749      *buffer>>id;
750      get(id)->recvCreateFileHeader(*buffer);
751   }
752
753   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
754   void CContext::recvCreateFileHeader(CBufferIn& buffer)
755   {
756      createFileHeader();
757   }
758
759   //! Client side: Send a message to do some post processing on server
760   void CContext::sendPostProcessing()
761   {
762     if (!hasServer)
763     {
764       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
765       if (client->isServerLeader())
766       {
767         CMessage msg;
768         msg<<this->getIdServer();
769         const std::list<int>& ranks = client->getRanksServerLeader();
770         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
771           event.push(*itRank,1,msg);
772         client->sendEvent(event);
773       }
774       else client->sendEvent(event);
775     }
776   }
777
778   //! Server side: Receive a message to do some post processing
779   void CContext::recvPostProcessing(CEventServer& event)
780   {
781      CBufferIn* buffer=event.subEvents.begin()->buffer;
782      string id;
783      *buffer>>id;
784      get(id)->recvPostProcessing(*buffer);
785   }
786
787   //! Server side: Receive a message to do some post processing
788   void CContext::recvPostProcessing(CBufferIn& buffer)
789   {
790      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
791      postProcessing();
792   }
793
794   const StdString& CContext::getIdServer()
795   {
796      if (hasClient)
797      {
798        idServer_ = this->getId();
799        idServer_ += "_server";
800        return idServer_;
801      }
802      if (hasServer) return (this->getId());
803   }
804
805   /*!
806   \brief Do some simple post processings after parsing xml file
807      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
808   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
809   which will be written out into netcdf files, are processed
810   */
811   void CContext::postProcessing()
812   {
813     int myRank;
814     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
815
816     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
817     if (isPostProcessed) return;
818
819      // Make sure the calendar was correctly created
820      if (!calendar)
821        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
822      else if (calendar->getTimeStep() == NoneDu)
823        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
824      // Calendar first update to set the current date equals to the start date
825      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
826
827      // Find all inheritance in xml structure
828      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
829
830      // Check if some axis, domains or grids are eligible to for compressed indexed output.
831      // Warning: This must be done after solving the inheritance and before the rest of post-processing
832      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
833
834      // Check if some automatic time series should be generated
835      // Warning: This must be done after solving the inheritance and before the rest of post-processing
836      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
837
838      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
839      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
840      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
841
842      // Find all enabled fields of each file
843      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
844      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
845
846     if (hasClient && !hasServer)
847     {
848      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
849      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
850     }
851
852      // Only search and rebuild all reference objects of enable fields, don't transform
853      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
854
855      // Search and rebuild all reference object of enabled fields
856      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
857
858      // Find all fields with read access from the public API
859      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
860      // and solve the all reference for them
861      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
862
863      isPostProcessed = true;
864   }
865
866   /*!
867    * Compute the required buffer size to send the attributes (mostly those grid related).
868    *
869    * \param maxEventSize [in/out] the size of the bigger event for each connected server
870    */
871   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
872   {
873     std::map<int, StdSize> attributesSize;
874
875     if (hasClient)
876     {
877       size_t numEnabledFiles = this->enabledFiles.size();
878       for (size_t i = 0; i < numEnabledFiles; ++i)
879       {
880         CFile* file = this->enabledFiles[i];
881
882         std::vector<CField*> enabledFields = file->getEnabledFields();
883         size_t numEnabledFields = enabledFields.size();
884         for (size_t j = 0; j < numEnabledFields; ++j)
885         {
886           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
887           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
888           for (; it != itE; ++it)
889           {
890             // If attributesSize[it->first] does not exist, it will be zero-initialized
891             // so we can use it safely without checking for its existance
892             if (attributesSize[it->first] < it->second)
893               attributesSize[it->first] = it->second;
894
895             if (maxEventSize[it->first] < it->second)
896               maxEventSize[it->first] = it->second;
897           }
898         }
899       }
900     }
901
902     return attributesSize;
903   }
904
905   /*!
906    * Compute the required buffer size to send the fields data.
907    *
908    * \param maxEventSize [in/out] the size of the bigger event for each connected server
909    */
910   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
911   {
912     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
913
914     std::map<int, StdSize> dataSize;
915
916     // Find all reference domain and axis of all active fields
917     size_t numEnabledFiles = this->enabledFiles.size();
918     for (size_t i = 0; i < numEnabledFiles; ++i)
919     {
920       CFile* file = this->enabledFiles[i];
921       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
922
923       if (fileMode == mode)
924       {
925         std::vector<CField*> enabledFields = file->getEnabledFields();
926         size_t numEnabledFields = enabledFields.size();
927         for (size_t j = 0; j < numEnabledFields; ++j)
928         {
929           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
930           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
931           for (; it != itE; ++it)
932           {
933             // If dataSize[it->first] does not exist, it will be zero-initialized
934             // so we can use it safely without checking for its existance
935             if (CXios::isOptPerformance)
936               dataSize[it->first] += it->second;
937             else if (dataSize[it->first] < it->second)
938               dataSize[it->first] = it->second;
939
940             if (maxEventSize[it->first] < it->second)
941               maxEventSize[it->first] = it->second;
942           }
943         }
944       }
945     }
946
947     return dataSize;
948   }
949
950   //! Client side: Send infomation of active files (files are enabled to write out)
951   void CContext::sendEnabledFiles()
952   {
953     int size = this->enabledFiles.size();
954
955     // In a context, each type has a root definition, e.g: axis, domain, field.
956     // Every object must be a child of one of these root definition. In this case
957     // all new file objects created on server must be children of the root "file_definition"
958     StdString fileDefRoot("file_definition");
959     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
960
961     for (int i = 0; i < size; ++i)
962     {
963       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
964       this->enabledFiles[i]->sendAllAttributesToServer();
965       this->enabledFiles[i]->sendAddAllVariables();
966     }
967   }
968
969   //! Client side: Send information of active fields (ones are written onto files)
970   void CContext::sendEnabledFields()
971   {
972     int size = this->enabledFiles.size();
973     for (int i = 0; i < size; ++i)
974     {
975       this->enabledFiles[i]->sendEnabledFields();
976     }
977   }
978
979   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
980   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
981   {
982     if (!hasClient) return;
983
984     const vector<CAxis*> allAxis = CAxis::getAll();
985     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
986       (*it)->checkEligibilityForCompressedOutput();
987
988     const vector<CDomain*> allDomains = CDomain::getAll();
989     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
990       (*it)->checkEligibilityForCompressedOutput();
991
992     const vector<CGrid*> allGrids = CGrid::getAll();
993     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
994       (*it)->checkEligibilityForCompressedOutput();
995   }
996
997   //! Client side: Prepare the timeseries by adding the necessary files
998   void CContext::prepareTimeseries()
999   {
1000     if (!hasClient) return;
1001
1002     const std::vector<CFile*> allFiles = CFile::getAll();
1003     for (size_t i = 0; i < allFiles.size(); i++)
1004     {
1005       CFile* file = allFiles[i];
1006
1007       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1008       {
1009         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1010
1011         const std::vector<CField*> allFields = file->getAllFields();
1012         for (size_t j = 0; j < allFields.size(); j++)
1013         {
1014           CField* field = allFields[j];
1015
1016           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1017           {
1018             CFile* tsFile = CFile::create();
1019             tsFile->duplicateAttributes(file);
1020             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1021
1022             tsFile->name = tsPrefix + "_";
1023             if (!field->name.isEmpty())
1024               tsFile->name.get() += field->name;
1025             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1026               tsFile->name.get() += field->field_ref;
1027             else
1028               tsFile->name.get() += field->getId();
1029
1030             if (!field->ts_split_freq.isEmpty())
1031               tsFile->split_freq = field->ts_split_freq;
1032
1033             CField* tsField = tsFile->addField();
1034             tsField->field_ref = field->getId();
1035             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1036
1037             tsFile->solveFieldRefInheritance(true);
1038
1039             if (file->timeseries == CFile::timeseries_attr::exclusive)
1040               field->enabled = false;
1041           }
1042         }
1043
1044         // Finally disable the original file is need be
1045         if (file->timeseries == CFile::timeseries_attr::only)
1046          file->enabled = false;
1047       }
1048     }
1049   }
1050
1051   //! Client side: Send information of reference grid of active fields
1052   void CContext::sendRefGrid()
1053   {
1054     std::set<StdString> gridIds;
1055     int sizeFile = this->enabledFiles.size();
1056     CFile* filePtr(NULL);
1057
1058     // Firstly, find all reference grids of all active fields
1059     for (int i = 0; i < sizeFile; ++i)
1060     {
1061       filePtr = this->enabledFiles[i];
1062       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1063       int sizeField = enabledFields.size();
1064       for (int numField = 0; numField < sizeField; ++numField)
1065       {
1066         if (0 != enabledFields[numField]->getRelGrid())
1067           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1068       }
1069     }
1070
1071     // Create all reference grids on server side
1072     StdString gridDefRoot("grid_definition");
1073     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1074     std::set<StdString>::const_iterator it, itE = gridIds.end();
1075     for (it = gridIds.begin(); it != itE; ++it)
1076     {
1077       gridPtr->sendCreateChild(*it);
1078       CGrid::get(*it)->sendAllAttributesToServer();
1079       CGrid::get(*it)->sendAllDomains();
1080       CGrid::get(*it)->sendAllAxis();
1081       CGrid::get(*it)->sendAllScalars();
1082     }
1083   }
1084
1085
1086   //! Client side: Send information of reference domain and axis of active fields
1087   void CContext::sendRefDomainsAxis()
1088   {
1089     std::set<StdString> domainIds, axisIds, scalarIds;
1090
1091     // Find all reference domain and axis of all active fields
1092     int numEnabledFiles = this->enabledFiles.size();
1093     for (int i = 0; i < numEnabledFiles; ++i)
1094     {
1095       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1096       int numEnabledFields = enabledFields.size();
1097       for (int j = 0; j < numEnabledFields; ++j)
1098       {
1099         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1100         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1101         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1102         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1103       }
1104     }
1105
1106     // Create all reference axis on server side
1107     std::set<StdString>::iterator itDom, itAxis, itScalar;
1108     std::set<StdString>::const_iterator itE;
1109
1110     StdString scalarDefRoot("scalar_definition");
1111     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1112     itE = scalarIds.end();
1113     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1114     {
1115       if (!itScalar->empty())
1116       {
1117         scalarPtr->sendCreateChild(*itScalar);
1118         CScalar::get(*itScalar)->sendAllAttributesToServer();
1119       }
1120     }
1121
1122     StdString axiDefRoot("axis_definition");
1123     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1124     itE = axisIds.end();
1125     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1126     {
1127       if (!itAxis->empty())
1128       {
1129         axisPtr->sendCreateChild(*itAxis);
1130         CAxis::get(*itAxis)->sendAllAttributesToServer();
1131       }
1132     }
1133
1134     // Create all reference domains on server side
1135     StdString domDefRoot("domain_definition");
1136     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1137     itE = domainIds.end();
1138     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1139     {
1140       if (!itDom->empty()) {
1141          domPtr->sendCreateChild(*itDom);
1142          CDomain::get(*itDom)->sendAllAttributesToServer();
1143       }
1144     }
1145   }
1146
1147   //! Update calendar in each time step
1148   void CContext::updateCalendar(int step)
1149   {
1150      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1151      calendar->update(step);
1152      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1153
1154      if (hasClient)
1155      {
1156        checkPrefetchingOfEnabledReadModeFiles();
1157        garbageCollector.invalidate(calendar->getCurrentDate());
1158      }
1159   }
1160
1161   //! Server side: Create header of netcdf file
1162   void CContext::createFileHeader(void )
1163   {
1164      vector<CFile*>::const_iterator it;
1165
1166      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1167      {
1168         (*it)->initFile();
1169      }
1170   }
1171
1172   //! Get current context
1173   CContext* CContext::getCurrent(void)
1174   {
1175     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1176   }
1177
1178   /*!
1179   \brief Set context with an id be the current context
1180   \param [in] id identity of context to be set to current
1181   */
1182   void CContext::setCurrent(const string& id)
1183   {
1184     CObjectFactory::SetCurrentContextId(id);
1185     CGroupFactory::SetCurrentContextId(id);
1186   }
1187
1188  /*!
1189  \brief Create a context with specific id
1190  \param [in] id identity of new context
1191  \return pointer to the new context or already-existed one with identity id
1192  */
1193   //bkp
1194//   CContext* CContext::create(const StdString& id)
1195//   {
1196//     CContext::setCurrent(id);
1197
1198//     bool hasctxt = CContext::has(id);
1199//     CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1200//     getRoot();
1201//     if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1202
1203// #define DECLARE_NODE(Name_, name_) \
1204//     C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1205// #define DECLARE_NODE_PAR(Name_, name_)
1206// #include "node_type.conf"
1207
1208//     return (context);
1209//   }
1210
1211
1212  CContext* CContext::create(const StdString& id)
1213  {
1214    CContext::setCurrent(id);
1215
1216
1217    bool hasctxt = CContext::has(id);
1218    CContext* context[omp_get_num_threads()];
1219    for(int i=0; i<omp_get_num_threads(); i++)
1220    {
1221
1222     context[i] = CObjectFactory::CreateObject<CContext>(id).get();
1223     getRoot();
1224     if (!hasctxt) CGroupFactory::AddChild(root, context[i]->getShared());
1225
1226#define DECLARE_NODE(Name_, name_) \
1227     C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1228#define DECLARE_NODE_PAR(Name_, name_)
1229#include "node_type.conf"
1230    }
1231    int tmp_rank;
1232    MPI_Comm_rank(MPI_COMM_WORLD, &tmp_rank);
1233    printf("CContext::create : num_threads = %d, my_id = %d, return add = %p\n", omp_get_num_threads(), tmp_rank, &(context[omp_get_thread_num()]));
1234   
1235    return (context[omp_get_thread_num()]);
1236  }
1237
1238     //! Server side: Receive a message to do some post processing
1239  void CContext::recvRegistry(CEventServer& event)
1240  {
1241    CBufferIn* buffer=event.subEvents.begin()->buffer;
1242    string id;
1243    *buffer>>id;
1244    get(id)->recvRegistry(*buffer);
1245  }
1246
1247  void CContext::recvRegistry(CBufferIn& buffer)
1248  {
1249    if (server->intraCommRank==0)
1250    {
1251      CRegistry registry(server->intraComm) ;
1252      registry.fromBuffer(buffer) ;
1253      registryOut->mergeRegistry(registry) ;
1254    }
1255  }
1256
1257  void CContext::sendRegistry(void)
1258  {
1259    registryOut->hierarchicalGatherRegistry() ;
1260
1261    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1262    if (client->isServerLeader())
1263    {
1264       CMessage msg ;
1265       msg<<this->getIdServer();
1266       if (client->clientRank==0) msg<<*registryOut ;
1267       const std::list<int>& ranks = client->getRanksServerLeader();
1268       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1269         event.push(*itRank,1,msg);
1270       client->sendEvent(event);
1271    }
1272    else client->sendEvent(event);
1273  }
1274
1275} // namespace xios
Note: See TracBrowser for help on using the repository browser.