Ignore:
Timestamp:
12/05/16 17:47:54 (7 years ago)
Author:
oabramkina
Message:

First working version with compression by secondary servers.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_olga/src/node/field.cpp

    r987 r1009  
    123123    CContext* context = CContext::getCurrent(); 
    124124//    CContextClient* client = context->client; 
    125     CContextClient* client = (!context->hasServer) ? context->client : context->clientPrimServer; 
     125    int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 
     126    for (int i = 0; i < nbSrvPools; ++i) 
     127    { 
     128      CContextClient* client = (!context->hasServer) ? context->client : context->clientPrimServer[i]; 
     129 
     130      CEventClient event(getType(), EVENT_ID_UPDATE_DATA); 
     131 
     132      map<int, CArray<int,1> >::iterator it; 
     133      list<CMessage> list_msg; 
     134      list<CArray<double,1> > list_data; 
     135 
     136      if (!grid->doGridHaveDataDistributed()) 
     137      { 
     138         if (client->isServerLeader()) 
     139         { 
     140            for (it = grid->storeIndex_toSrv.begin(); it != grid->storeIndex_toSrv.end(); it++) 
     141            { 
     142              int rank = it->first; 
     143              CArray<int,1>& index = it->second; 
     144 
     145              list_msg.push_back(CMessage()); 
     146              list_data.push_back(CArray<double,1>(index.numElements())); 
     147 
     148              CArray<double,1>& data_tmp = list_data.back(); 
     149              for (int n = 0; n < data_tmp.numElements(); n++) data_tmp(n) = data(index(n)); 
     150 
     151              list_msg.back() << getId() << data_tmp; 
     152              event.push(rank, 1, list_msg.back()); 
     153            } 
     154            client->sendEvent(event); 
     155         } 
     156         else client->sendEvent(event); 
     157      } 
     158      else 
     159      { 
     160        for (it = grid->storeIndex_toSrv.begin(); it != grid->storeIndex_toSrv.end(); it++) 
     161        { 
     162          int rank = it->first; 
     163          CArray<int,1>& index = it->second; 
     164 
     165          list_msg.push_back(CMessage()); 
     166          list_data.push_back(CArray<double,1>(index.numElements())); 
     167 
     168          CArray<double,1>& data_tmp = list_data.back(); 
     169          for (int n = 0; n < data_tmp.numElements(); n++) data_tmp(n) = data(index(n)); 
     170 
     171          list_msg.back() << getId() << data_tmp; 
     172          event.push(rank, grid->nbSenders[rank], list_msg.back()); 
     173        } 
     174        client->sendEvent(event); 
     175      } 
     176    } 
     177 
     178    CTimer::get("XIOS Send Data").suspend(); 
     179  } 
     180 
     181  void CField::sendUpdateData(const CArray<double,1>& data, const int srvPool) 
     182  { 
     183    CTimer::get("XIOS Send Data").resume(); 
     184 
     185    CContext* context = CContext::getCurrent(); 
     186    CContextClient* client = context->clientPrimServer[srvPool]; 
    126187 
    127188    CEventClient event(getType(), EVENT_ID_UPDATE_DATA); 
     
    150211          } 
    151212          client->sendEvent(event); 
    152        }  
     213       } 
    153214       else client->sendEvent(event); 
    154215    } 
     
    171232      client->sendEvent(event); 
    172233    } 
    173  
    174234    CTimer::get("XIOS Send Data").suspend(); 
    175235  } 
     
    248308//      else 
    249309        this->outputField(fieldData); 
    250       sendUpdateData(fieldData); 
     310//        sendUpdateData(fieldData); 
     311        // Redirecting data to the correct secondary server 
     312        int fileIdx = std::find(context->enabledFiles.begin(), context->enabledFiles.end(), this->file) - context->enabledFiles.begin(); 
     313        int srvId = fileIdx % context->clientPrimServer.size(); 
     314        sendUpdateData(fieldData, srvId); 
    251315    } 
    252316    if (!context->hasClient && context->hasServer) 
    253317    { 
     318//      size_t writtenSize; 
     319//      if (this->getUseCompressedOutput()) 
     320//        writtenSize = grid->getNumberWrittenIndexes(); 
     321//      else 
     322//        writtenSize = grid->getWrittenDataSize(); 
     323// 
     324//      CArray<double,1> fieldData(writtenSize); 
     325 
     326//      if (this->getUseCompressedOutput()) 
     327//        this->outputCompressedField(fieldData); 
     328//      else 
     329//        this->outputField(fieldData); 
    254330      writeField(); 
    255331    } 
     
    12491325   } 
    12501326 
     1327   void CField::sendAddAllVariables(const int srvPool) 
     1328   { 
     1329     std::vector<CVariable*> allVar = getAllVariables(); 
     1330     std::vector<CVariable*>::const_iterator it = allVar.begin(); 
     1331     std::vector<CVariable*>::const_iterator itE = allVar.end(); 
     1332 
     1333     for (; it != itE; ++it) 
     1334     { 
     1335       this->sendAddVariable((*it)->getId()); 
     1336       (*it)->sendAllAttributesToServer(srvPool); 
     1337       (*it)->sendValue(srvPool); 
     1338     } 
     1339   } 
     1340 
    12511341   void CField::sendAddVariable(const string& id) 
    12521342   { 
Note: See TracChangeset for help on using the changeset viewer.