Ignore:
Timestamp:
01/11/17 15:14:22 (7 years ago)
Author:
mhnguyen
Message:

Merging working version of coupler

+) Add some changes of domain and axis: Retransfer the atttributes in a generic ways for each level of client (or server)
+) Remove some spoiled files from the previous commits

Test
+) No test

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_olga/src/node/field.cpp

    r1021 r1025  
    2323#include "temporal_filter.hpp" 
    2424#include "spatial_transform_filter.hpp" 
     25#include "file_server_writer_filter.hpp" 
    2526 
    2627namespace xios{ 
     
    234235  void CField::recvUpdateData(CEventServer& event) 
    235236  { 
    236     vector<int> ranks; 
    237     vector<CBufferIn*> buffers; 
     237    std::map<int,CBufferIn*> rankBuffers; 
    238238 
    239239    list<CEventServer::SSubEvent>::iterator it; 
     
    245245      CBufferIn* buffer = it->buffer; 
    246246      *buffer >> fieldId; 
    247       ranks.push_back(rank); 
    248       buffers.push_back(buffer); 
    249     } 
    250     get(fieldId)->recvUpdateData(ranks,buffers); 
     247      rankBuffers[rank] = buffer; 
     248    } 
     249    get(fieldId)->recvUpdateData(rankBuffers); 
    251250  } 
    252251 
    253   void  CField::recvUpdateData(vector<int>& ranks, vector<CBufferIn*>& buffers) 
     252  void  CField::recvUpdateData(std::map<int,CBufferIn*>& rankBuffers) 
    254253  { 
    255254    CContext* context = CContext::getCurrent(); 
    256255 
    257     if (data_srv.empty()) 
     256    size_t sizeData = 0; 
     257    if (0 == recvDataSrv.numElements()) 
     258    {       
     259      for (map<int, CArray<size_t, 1> >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it) 
     260      { 
     261        sizeData += it->second.numElements(); 
     262      } 
     263 
     264      // Gather all data from different clients 
     265      recvDataSrv.resize(sizeData); 
     266      recvFoperationSrv = boost::shared_ptr<func::CFunctor>(new func::CInstant(recvDataSrv)); 
     267    } 
     268 
     269    CArray<double,1> recv_data_tmp(recvDataSrv.numElements()); 
     270    sizeData = 0; 
     271    const CDate& currDate = context->getCalendar()->getCurrentDate(); 
     272    const CDate opeDate      = last_operation_srv +freq_op + freq_operation_srv - freq_op; 
     273 
     274    if (opeDate <= currDate) 
    258275    { 
    259276      for (map<int, CArray<size_t, 1> >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it) 
    260       { 
    261         int rank = it->first; 
    262         data_srv.insert(std::make_pair(rank, CArray<double,1>(it->second.numElements()))); 
    263         foperation_srv.insert(pair<int,boost::shared_ptr<func::CFunctor> >(rank,boost::shared_ptr<func::CFunctor>(new func::CInstant(data_srv[rank])))); 
    264       } 
    265     } 
     277      {         
     278        CArray<double,1> tmp;        
     279        *(rankBuffers[it->first]) >> tmp; 
     280        recv_data_tmp(Range(sizeData,sizeData+it->second.numElements()-1)) = tmp;   
     281        sizeData += it->second.numElements();       
     282      } 
     283    } 
     284 
     285    this->setData(recv_data_tmp); 
     286  } 
     287 
     288  void CField::writeUpdateData(const CArray<double,1>& data) 
     289  { 
     290    CContext* context = CContext::getCurrent(); 
    266291 
    267292    const CDate& currDate = context->getCalendar()->getCurrentDate(); 
     
    271296    if (opeDate <= currDate) 
    272297    { 
    273       for (int n = 0; n < ranks.size(); n++) 
    274       { 
    275         CArray<double,1> data_tmp; 
    276         *buffers[n] >> data_tmp; 
    277         (*foperation_srv[ranks[n]])(data_tmp); 
    278       } 
     298      (*recvFoperationSrv)(data); 
    279299      last_operation_srv = currDate; 
     300//        sendUpdateData(fieldData); 
     301        // Redirecting data to the correct secondary server 
     302        //int fileIdx = std::find(context->enabledFiles.begin(), context->enabledFiles.end(), this->file) - context->enabledFiles.begin(); 
     303        //int srvId = fileIdx % context->clientPrimServer.size(); 
     304        //sendUpdateData(fieldData, context->clientPrimServer[srvId]); 
    280305    } 
    281306 
    282307    if (writeDate < (currDate + freq_operation_srv)) 
    283308    { 
    284       for (int n = 0; n < ranks.size(); n++) 
    285       { 
    286         this->foperation_srv[ranks[n]]->final(); 
    287       } 
    288  
     309      recvFoperationSrv->final(); 
    289310      last_Write_srv = writeDate; 
    290     } 
    291  
    292     if (context->hasClient && context->hasServer) 
    293     { 
    294       size_t writtenSize; 
    295 //      if (field->getUseCompressedOutput()) 
    296 //        writtenSize = grid->getNumberWrittenIndexes(); 
    297 //      else 
    298         writtenSize = grid->getWrittenDataSize(); 
    299  
    300       CArray<double,1> fieldData(writtenSize); 
    301 //      if (!field->default_value.isEmpty()) fieldData = field->default_value; 
    302  
    303 //      if (field->getUseCompressedOutput()) 
    304 //        field->outputCompressedField(fieldData); 
    305 //      else 
    306         this->outputField(fieldData); 
    307 //        sendUpdateData(fieldData); 
    308         // Redirecting data to the correct secondary server 
    309         int fileIdx = std::find(context->enabledFiles.begin(), context->enabledFiles.end(), this->file) - context->enabledFiles.begin(); 
    310         int srvId = fileIdx % context->clientPrimServer.size(); 
    311         sendUpdateData(fieldData, context->clientPrimServer[srvId]); 
    312     } 
    313     if (!context->hasClient && context->hasServer) 
    314     { 
    315 //      size_t writtenSize; 
    316 //      if (this->getUseCompressedOutput()) 
    317 //        writtenSize = grid->getNumberWrittenIndexes(); 
    318 //      else 
    319 //        writtenSize = grid->getWrittenDataSize(); 
    320 // 
    321 //      CArray<double,1> fieldData(writtenSize); 
    322  
    323 //      if (this->getUseCompressedOutput()) 
    324 //        this->outputCompressedField(fieldData); 
    325 //      else 
    326 //        this->outputField(fieldData); 
    327311      writeField(); 
    328     } 
    329  
    330     lastlast_Write_srv = last_Write_srv; 
    331  
     312      lastlast_Write_srv = last_Write_srv; 
     313    } 
    332314  } 
     315   
     316//   void  CField::recvUpdateData(vector<int>& ranks, vector<CBufferIn*>& buffers) 
     317//   { 
     318//     CContext* context = CContext::getCurrent(); 
     319 
     320//     if (data_srv.empty()) 
     321//     { 
     322//       for (map<int, CArray<size_t, 1> >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it) 
     323//       { 
     324//         int rank = it->first; 
     325//         data_srv.insert(std::make_pair(rank, CArray<double,1>(it->second.numElements()))); 
     326//         foperation_srv.insert(pair<int,boost::shared_ptr<func::CFunctor> >(rank,boost::shared_ptr<func::CFunctor>(new func::CInstant(data_srv[rank])))); 
     327//       } 
     328//     } 
     329 
     330//     const CDate& currDate = context->getCalendar()->getCurrentDate(); 
     331//     const CDate opeDate      = last_operation_srv +freq_op + freq_operation_srv - freq_op; 
     332//     const CDate writeDate    = last_Write_srv     + freq_write_srv; 
     333 
     334//     if (opeDate <= currDate) 
     335//     { 
     336//       for (int n = 0; n < ranks.size(); n++) 
     337//       { 
     338//         CArray<double,1> data_tmp; 
     339//         *buffers[n] >> data_tmp; 
     340//         (*foperation_srv[ranks[n]])(data_tmp); 
     341//       } 
     342//       last_operation_srv = currDate; 
     343//     } 
     344 
     345//     if (writeDate < (currDate + freq_operation_srv)) 
     346//     { 
     347//       for (int n = 0; n < ranks.size(); n++) 
     348//       { 
     349//         this->foperation_srv[ranks[n]]->final(); 
     350//       } 
     351 
     352//       last_Write_srv = writeDate; 
     353//     } 
     354 
     355//     if (context->hasClient && context->hasServer) 
     356//     { 
     357//       size_t writtenSize; 
     358// //      if (field->getUseCompressedOutput()) 
     359// //        writtenSize = grid->getNumberWrittenIndexes(); 
     360// //      else 
     361//         writtenSize = grid->getWrittenDataSize(); 
     362 
     363//       CArray<double,1> fieldData(writtenSize); 
     364// //      if (!field->default_value.isEmpty()) fieldData = field->default_value; 
     365 
     366// //      if (field->getUseCompressedOutput()) 
     367// //        field->outputCompressedField(fieldData); 
     368// //      else 
     369//         this->outputField(fieldData); 
     370//       sendUpdateData(fieldData); 
     371//     } 
     372//     if (!context->hasClient && context->hasServer) 
     373//     { 
     374//       writeField(); 
     375//     } 
     376 
     377//     lastlast_Write_srv = last_Write_srv; 
     378 
     379//   } 
    333380 
    334381  void CField::writeField(void) 
     
    656703 
    657704   //---------------------------------------------------------------- 
    658  
    659    void CField::solveOnlyReferenceEnabledField(bool doSending2Server) 
    660    { 
    661      CContext* context = CContext::getCurrent(); 
    662      if (!isReferenceSolved) 
    663      { 
    664         isReferenceSolved = true; 
    665  
    666         if (context->hasClient && !context->hasServer) 
    667 //        if (context->hasClient) 
    668         { 
    669           solveRefInheritance(true); 
    670           if (hasDirectFieldReference()) getDirectFieldReference()->solveOnlyReferenceEnabledField(false); 
    671         } 
    672 //        else if (context->hasServer) 
    673         if (context->hasServer) 
    674           solveServerOperation(); 
    675  
    676         solveGridReference(); 
    677  
    678         if (context->hasClient && !context->hasServer) 
    679 //       if (context->hasClient) 
    680        { 
    681          solveGenerateGrid(); 
    682          buildGridTransformationGraph(); 
    683        } 
    684      } 
    685    } 
    686705 
    687706   /*! 
     
    784803     } 
    785804   } 
    786  
     805    
     806   void CField::solveAllEnabledFields() 
     807   { 
     808     CContext* context = CContext::getCurrent(); 
     809     bool hasClient = context->hasClient; 
     810     bool hasServer = context->hasServer; 
     811 
     812     if (!isReferenceSolved) 
     813     { 
     814        isReferenceSolved = true; 
     815 
     816        if (hasClient && !hasServer) 
     817        { 
     818          solveRefInheritance(true); 
     819          if (hasDirectFieldReference()) getDirectFieldReference()->solveAllEnabledFields(); 
     820        } 
     821 
     822        if (hasServer) 
     823          solveServerOperation(); 
     824 
     825        solveGridReference(); 
     826 
     827        if (hasClient && !hasServer) 
     828       { 
     829         solveGenerateGrid(); 
     830         buildGridTransformationGraph(); 
     831       } 
     832 
     833       solveGridDomainAxisRef(false); 
     834 
     835       if (hasClient && !hasServer) 
     836       { 
     837         solveTransformedGrid(); 
     838       } 
     839 
     840       solveGridDomainAxisRef(false); 
     841     } 
     842   } 
     843 
     844   void CField::checkGridOfEnabledFields() 
     845   { 
     846      solveCheckMaskIndex(false); 
     847   } 
     848 
     849   void CField::sendGridOfEnabledFields() 
     850   { 
     851      solveGridDomainAxisRef(true); 
     852      solveCheckMaskIndex(true); 
     853   } 
     854 
     855 
     856    void CField::solveOnlyReferenceEnabledField(bool doSending2Server) 
     857   { 
     858     CContext* context = CContext::getCurrent(); 
     859     if (!isReferenceSolved) 
     860     { 
     861        isReferenceSolved = true; 
     862 
     863        if (context->hasClient && !context->hasServer) 
     864//        if (context->hasClient) 
     865        { 
     866          solveRefInheritance(true); 
     867          if (hasDirectFieldReference()) getDirectFieldReference()->solveOnlyReferenceEnabledField(false); 
     868        } 
     869//        else if (context->hasServer) 
     870        if (context->hasServer) 
     871          solveServerOperation(); 
     872 
     873        solveGridReference(); 
     874 
     875        if (context->hasClient && !context->hasServer) 
     876//       if (context->hasClient) 
     877       { 
     878         solveGenerateGrid(); 
     879         buildGridTransformationGraph(); 
     880       } 
     881     } 
     882   } 
     883      
    787884   void CField::solveAllReferenceEnabledField(bool doSending2Server) 
    788885   { 
     
    794891        areAllReferenceSolved = true; 
    795892 
    796 //        if (context->hasClient) 
     893       // if (context->hasClient) 
    797894        if (context->hasClient && !context->hasServer) 
    798895        { 
     
    800897          if (hasDirectFieldReference()) getDirectFieldReference()->solveAllReferenceEnabledField(false); 
    801898        } 
    802 //        else if (context->hasServer) 
    803         if (context->hasServer && !context->hasClient) 
     899       else if (context->hasServer) 
     900        // if (context->hasServer && !context->hasClient) 
    804901          solveServerOperation(); 
    805902 
     
    885982   void CField::buildFilterGraph(CGarbageCollector& gc, bool enableOutput) 
    886983   { 
    887      if (!areAllReferenceSolved) solveAllReferenceEnabledField(false); 
    888  
    889      // Start by building a filter which can provide the field's instant data 
    890      if (!instantDataFilter) 
    891      { 
    892        // Check if we have an expression to parse 
     984     // if (!areAllReferenceSolved) solveAllReferenceEnabledField(false); 
     985    if (!isReferenceSolved) solveAllEnabledFields(); 
     986     CContext* context = CContext::getCurrent(); 
     987     bool hasWriterServer = context->hasServer && !context->hasClient; 
     988     bool hasIntermediateServer = context->hasServer && context->hasClient; 
     989 
     990     if (hasWriterServer) 
     991     { 
     992        if (!instantDataFilter) 
     993          instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid)); 
     994 
     995             // If the field data is to be read by the client or/and written to a file 
     996       if (enableOutput && !storeFilter && !fileWriterFilter) 
     997       { 
     998         if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 
     999         { 
     1000           fileServerWriterFilter = boost::shared_ptr<CFileServerWriterFilter>(new CFileServerWriterFilter(gc, this)); 
     1001           instantDataFilter->connectOutput(fileServerWriterFilter, 0); 
     1002         } 
     1003       } 
     1004     } 
     1005     else if (hasIntermediateServer) 
     1006     { 
     1007       if (!instantDataFilter) 
     1008          instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid)); 
     1009 
     1010             // If the field data is to be read by the client or/and written to a file 
     1011       if (enableOutput && !storeFilter && !fileWriterFilter) 
     1012       { 
     1013         if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 
     1014         { 
     1015           fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 
     1016           instantDataFilter->connectOutput(fileWriterFilter, 0); 
     1017         } 
     1018       } 
     1019     } 
     1020     else 
     1021     { 
     1022       // Start by building a filter which can provide the field's instant data 
     1023       if (!instantDataFilter) 
     1024       { 
     1025         // Check if we have an expression to parse 
    8931026       if (hasExpression()) 
    894        { 
     1027         { 
    8951028         boost::scoped_ptr<IFilterExprNode> expr(parseExpr(getExpression() + '\0')); 
    8961029         boost::shared_ptr<COutputPin> filter = expr->reduce(gc, *this); 
     
    9121045 
    9131046         instantDataFilter = filter; 
    914        } 
    915        // Check if we have a reference on another field 
    916        else if (!field_ref.isEmpty()) 
    917          instantDataFilter = getFieldReference(gc); 
    918        // Check if the data is to be read from a file 
    919        else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 
     1047         } 
     1048         // Check if we have a reference on another field 
     1049         else if (!field_ref.isEmpty()) 
     1050           instantDataFilter = getFieldReference(gc); 
     1051         // Check if the data is to be read from a file 
     1052         else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 
    9201053         instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 
    9211054                                                                                                     freq_offset.isEmpty() ? NoneDu : freq_offset, 
    9221055                                                                                                     true)); 
    923        else // The data might be passed from the model 
     1056         else // The data might be passed from the model 
    9241057         instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 
    925      } 
    926  
    927      // If the field data is to be read by the client or/and written to a file 
    928      if (enableOutput && !storeFilter && !fileWriterFilter) 
    929      { 
    930        if (!read_access.isEmpty() && read_access) 
    931        { 
    932          storeFilter = boost::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid)); 
    933          instantDataFilter->connectOutput(storeFilter, 0); 
    934        } 
    935  
    936        if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 
    937        { 
    938          fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 
    939          getTemporalDataFilter(gc, file->output_freq)->connectOutput(fileWriterFilter, 0); 
     1058       } 
     1059 
     1060       // If the field data is to be read by the client or/and written to a file 
     1061       if (enableOutput && !storeFilter && !fileWriterFilter) 
     1062       { 
     1063         if (!read_access.isEmpty() && read_access) 
     1064         { 
     1065           storeFilter = boost::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid)); 
     1066           instantDataFilter->connectOutput(storeFilter, 0); 
     1067         } 
     1068 
     1069         if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 
     1070         { 
     1071           fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 
     1072           getTemporalDataFilter(gc, file->output_freq)->connectOutput(fileWriterFilter, 0); 
     1073         } 
    9401074       } 
    9411075     } 
     
    12851419   void CField::outputField(CArray<double,1>& fieldOut) 
    12861420   { 
    1287       map<int, CArray<double,1> >::iterator it; 
    1288  
    1289       for (it = data_srv.begin(); it != data_srv.end(); it++) 
    1290       { 
    1291          grid->outputField(it->first, it->second, fieldOut.dataFirst()); 
    1292       } 
     1421      // map<int, CArray<double,1> >::iterator it; 
     1422 
     1423      // for (it = data_srv.begin(); it != data_srv.end(); it++) 
     1424      // { 
     1425      //    grid->outputField(it->first, it->second, fieldOut.dataFirst()); 
     1426      // } 
     1427    grid->outputField(recvDataSrv, fieldOut); 
    12931428   } 
    12941429 
Note: See TracChangeset for help on using the changeset viewer.