New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 9598

Last change on this file since 9598 was 9598, checked in by nicolasmartin, 6 years ago

Reorganisation plan for NEMO repository: changes to make compilation succeed with new structure
Juste one issue left with AGRIF_NORDIC with AGRIF preprocessing
Standardisation of routines header with version 4.0 and year 2018
Fix for some broken symlinks

  • Property svn:keywords set to Id
File size: 86.8 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
[2715]35   !!----------------------------------------------------------------------
[3764]36#if   defined key_mpp_mpi
[13]37   !!----------------------------------------------------------------------
[1344]38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
[2715]40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]44   !!   mpprecv       :
[9019]45   !!   mppsend       :
[2715]46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
[1344]55   !!   mpp_ini_north : initialisation of north fold
[9019]56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[13]57   !!----------------------------------------------------------------------
[3764]58   USE dom_oce        ! ocean space and time domain
[2715]59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
[3]61
[13]62   IMPLICIT NONE
[415]63   PRIVATE
[9019]64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d      , mpp_nfd_3d      , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d      , mpp_lnk_3d      , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
[4147]79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
[1344]80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
[9019]81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
[1344]84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[6490]85   PUBLIC   mpp_max_multiple
[3294]86   PUBLIC   mppscatter, mppgather
[4328]87   PUBLIC   mpp_ini_ice, mpp_ini_znl
[2715]88   PUBLIC   mppsize
[3764]89   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[3680]90   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d
[6490]91   PUBLIC   mpprank
[5429]92   
[13]93   !! * Interfaces
94   !! define generic interface for these routine as they are called sometimes
[1344]95   !! with scalar arguments instead of array arguments, which causes problems
96   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]97   INTERFACE mpp_min
98      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
99   END INTERFACE
100   INTERFACE mpp_max
[681]101      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]102   END INTERFACE
103   INTERFACE mpp_sum
[6140]104      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
[9019]105         &             mppsum_realdd, mppsum_a_realdd
[13]106   END INTERFACE
[1344]107   INTERFACE mpp_minloc
108      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
109   END INTERFACE
110   INTERFACE mpp_maxloc
111      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
112   END INTERFACE
[6490]113   INTERFACE mpp_max_multiple
114      MODULE PROCEDURE mppmax_real_multiple
115   END INTERFACE
116
[51]117   !! ========================= !!
118   !!  MPI  variable definition !!
119   !! ========================= !!
[1629]120!$AGRIF_DO_NOT_TREAT
[2004]121   INCLUDE 'mpif.h'
[1629]122!$AGRIF_END_DO_NOT_TREAT
[3764]123
[1344]124   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[3]125
[1344]126   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]127
[1344]128   INTEGER ::   mppsize        ! number of process
129   INTEGER ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]130!$AGRIF_DO_NOT_TREAT
[9570]131   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]132!$AGRIF_END_DO_NOT_TREAT
[3]133
[2480]134   INTEGER :: MPI_SUMDD
[1976]135
[869]136   ! variables used in case of sea-ice
[9069]137   INTEGER, PUBLIC ::   ncomm_ice       !: communicator made by the processors with sea-ice (public so that it can be freed in icethd)
[9019]138   INTEGER         ::   ngrp_iworld     !  group ID for the world processors (for rheology)
139   INTEGER         ::   ngrp_ice        !  group ID for the ice processors (for rheology)
140   INTEGER         ::   ndim_rank_ice   !  number of 'ice' processors
141   INTEGER         ::   n_ice_root      !  number (in the comm_ice) of proc 0 in the ice comm
[2715]142   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_ice     ! dimension ndim_rank_ice
[1345]143
144   ! variables used for zonal integration
145   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]146   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
147   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
148   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]149   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]150
[3764]151   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]152   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
153   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
154   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
155   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
156   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
157   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
158   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
159   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]160
[1344]161   ! Type of send : standard, buffered, immediate
[9019]162   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
163   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
164   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
[3764]165
[9019]166   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]167
[9019]168   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
169   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
170
[51]171   !!----------------------------------------------------------------------
[9598]172   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]173   !! $Id$
[9598]174   !! Software governed by the CeCILL licence     (./LICENSE)
[1344]175   !!----------------------------------------------------------------------
[3]176CONTAINS
177
[9019]178   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
[2715]179      !!----------------------------------------------------------------------
[51]180      !!                  ***  routine mynode  ***
[3764]181      !!
[51]182      !! ** Purpose :   Find processor unit
183      !!----------------------------------------------------------------------
[6140]184      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
185      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
186      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
187      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
188      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
189      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
190      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]191      !
[4147]192      INTEGER ::   mynode, ierr, code, ji, ii, ios
[532]193      LOGICAL ::   mpi_was_called
[2715]194      !
[3294]195      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
[51]196      !!----------------------------------------------------------------------
[1344]197      !
[2481]198      ii = 1
[6140]199      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
200      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
201      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
[1344]202      !
[4147]203      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
204      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
[9168]205901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
[9019]206      !
[4147]207      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
208      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
[9168]209902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
[9019]210      !
[1344]211      !                              ! control print
[6140]212      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
213      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
214      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
[9019]215      !
[2731]216#if defined key_agrif
217      IF( .NOT. Agrif_Root() ) THEN
[3764]218         jpni  = Agrif_Parent(jpni )
[2731]219         jpnj  = Agrif_Parent(jpnj )
220         jpnij = Agrif_Parent(jpnij)
221      ENDIF
222#endif
[9019]223      !
224      IF( jpnij < 1 ) THEN         ! If jpnij is not specified in namelist then we calculate it
225         jpnij = jpni * jpnj       ! this means there will be no land cutting out.
226      ENDIF
[2731]227
[9019]228      IF( jpni < 1 .OR. jpnj < 1  ) THEN
[6140]229         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
[2715]230      ELSE
[6140]231         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
232         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
233         WRITE(ldtxt(ii),*) '      number of local domains           jpnij = ',jpnij      ;   ii = ii + 1
[9019]234      ENDIF
[2715]235
[3294]236      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
237
[2480]238      CALL mpi_initialized ( mpi_was_called, code )
239      IF( code /= MPI_SUCCESS ) THEN
[3764]240         DO ji = 1, SIZE(ldtxt)
[2481]241            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
[3764]242         END DO
[2480]243         WRITE(*, cform_err)
244         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
245         CALL mpi_abort( mpi_comm_world, code, ierr )
246      ENDIF
[415]247
[2480]248      IF( mpi_was_called ) THEN
249         !
250         SELECT CASE ( cn_mpi_send )
251         CASE ( 'S' )                ! Standard mpi send (blocking)
[6140]252            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
[2480]253         CASE ( 'B' )                ! Buffer mpi send (blocking)
[6140]254            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
[9570]255            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
[2480]256         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[6140]257            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
[2480]258            l_isend = .TRUE.
259         CASE DEFAULT
[6140]260            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
261            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
[2715]262            kstop = kstop + 1
[2480]263         END SELECT
[9019]264         !
265      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
[6140]266         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
267         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
[2715]268         kstop = kstop + 1
[532]269      ELSE
[1601]270         SELECT CASE ( cn_mpi_send )
[524]271         CASE ( 'S' )                ! Standard mpi send (blocking)
[6140]272            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
[2480]273            CALL mpi_init( ierr )
[524]274         CASE ( 'B' )                ! Buffer mpi send (blocking)
[6140]275            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
[9570]276            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
[524]277         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[6140]278            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
[524]279            l_isend = .TRUE.
[2480]280            CALL mpi_init( ierr )
[524]281         CASE DEFAULT
[6140]282            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
283            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
[2715]284            kstop = kstop + 1
[524]285         END SELECT
[2480]286         !
[415]287      ENDIF
[570]288
[3764]289      IF( PRESENT(localComm) ) THEN
[2480]290         IF( Agrif_Root() ) THEN
[9570]291            mpi_comm_oce = localComm
[2480]292         ENDIF
293      ELSE
[9570]294         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
[2480]295         IF( code /= MPI_SUCCESS ) THEN
[3764]296            DO ji = 1, SIZE(ldtxt)
[2481]297               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
298            END DO
[2480]299            WRITE(*, cform_err)
300            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
301            CALL mpi_abort( mpi_comm_world, code, ierr )
302         ENDIF
[3764]303      ENDIF
[2480]304
[5656]305#if defined key_agrif
[9019]306      IF( Agrif_Root() ) THEN
[9570]307         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]308      ELSE
[9570]309         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]310      ENDIF
311#endif
312
[9570]313      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
314      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[629]315      mynode = mpprank
[4624]316
317      IF( mynode == 0 ) THEN
[5407]318         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
319         WRITE(kumond, nammpp)     
[4624]320      ENDIF
[3764]321      !
[1976]322      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
323      !
[51]324   END FUNCTION mynode
[3]325
[9019]326   !!----------------------------------------------------------------------
327   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
328   !!
329   !!   * Argument : dummy argument use in mpp_lnk_... routines
330   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
331   !!                cd_nat :   nature of array grid-points
332   !!                psgn   :   sign used across the north fold boundary
333   !!                kfld   :   optional, number of pt3d arrays
334   !!                cd_mpp :   optional, fill the overlap area only
335   !!                pval   :   optional, background value (used at closed boundaries)
336   !!----------------------------------------------------------------------
337   !
338   !                       !==  2D array and array of 2D pointer  ==!
339   !
340#  define DIM_2d
341#     define ROUTINE_LNK           mpp_lnk_2d
342#     include "mpp_lnk_generic.h90"
343#     undef ROUTINE_LNK
344#     define MULTI
345#     define ROUTINE_LNK           mpp_lnk_2d_ptr
346#     include "mpp_lnk_generic.h90"
347#     undef ROUTINE_LNK
348#     undef MULTI
349#  undef DIM_2d
350   !
351   !                       !==  3D array and array of 3D pointer  ==!
352   !
353#  define DIM_3d
354#     define ROUTINE_LNK           mpp_lnk_3d
355#     include "mpp_lnk_generic.h90"
356#     undef ROUTINE_LNK
357#     define MULTI
358#     define ROUTINE_LNK           mpp_lnk_3d_ptr
359#     include "mpp_lnk_generic.h90"
360#     undef ROUTINE_LNK
361#     undef MULTI
362#  undef DIM_3d
363   !
364   !                       !==  4D array and array of 4D pointer  ==!
365   !
366#  define DIM_4d
367#     define ROUTINE_LNK           mpp_lnk_4d
368#     include "mpp_lnk_generic.h90"
369#     undef ROUTINE_LNK
370#     define MULTI
371#     define ROUTINE_LNK           mpp_lnk_4d_ptr
372#     include "mpp_lnk_generic.h90"
373#     undef ROUTINE_LNK
374#     undef MULTI
375#  undef DIM_4d
[6140]376
[9019]377   !!----------------------------------------------------------------------
378   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
379   !!
380   !!   * Argument : dummy argument use in mpp_nfd_... routines
381   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
382   !!                cd_nat :   nature of array grid-points
383   !!                psgn   :   sign used across the north fold boundary
384   !!                kfld   :   optional, number of pt3d arrays
385   !!                cd_mpp :   optional, fill the overlap area only
386   !!                pval   :   optional, background value (used at closed boundaries)
387   !!----------------------------------------------------------------------
388   !
389   !                       !==  2D array and array of 2D pointer  ==!
390   !
391#  define DIM_2d
392#     define ROUTINE_NFD           mpp_nfd_2d
393#     include "mpp_nfd_generic.h90"
394#     undef ROUTINE_NFD
395#     define MULTI
396#     define ROUTINE_NFD           mpp_nfd_2d_ptr
397#     include "mpp_nfd_generic.h90"
398#     undef ROUTINE_NFD
399#     undef MULTI
400#  undef DIM_2d
401   !
402   !                       !==  3D array and array of 3D pointer  ==!
403   !
404#  define DIM_3d
405#     define ROUTINE_NFD           mpp_nfd_3d
406#     include "mpp_nfd_generic.h90"
407#     undef ROUTINE_NFD
408#     define MULTI
409#     define ROUTINE_NFD           mpp_nfd_3d_ptr
410#     include "mpp_nfd_generic.h90"
411#     undef ROUTINE_NFD
412#     undef MULTI
413#  undef DIM_3d
414   !
415   !                       !==  4D array and array of 4D pointer  ==!
416   !
417#  define DIM_4d
418#     define ROUTINE_NFD           mpp_nfd_4d
419#     include "mpp_nfd_generic.h90"
420#     undef ROUTINE_NFD
421#     define MULTI
422#     define ROUTINE_NFD           mpp_nfd_4d_ptr
423#     include "mpp_nfd_generic.h90"
424#     undef ROUTINE_NFD
425#     undef MULTI
426#  undef DIM_4d
[3]427
[1344]428
[9019]429   !!----------------------------------------------------------------------
430   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
431   !!
432   !!   * Argument : dummy argument use in mpp_lnk_... routines
433   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
434   !!                cd_nat :   nature of array grid-points
435   !!                psgn   :   sign used across the north fold boundary
436   !!                kb_bdy :   BDY boundary set
437   !!                kfld   :   optional, number of pt3d arrays
438   !!----------------------------------------------------------------------
439   !
440   !                       !==  2D array and array of 2D pointer  ==!
441   !
442#  define DIM_2d
443#     define ROUTINE_BDY           mpp_lnk_bdy_2d
444#     include "mpp_bdy_generic.h90"
445#     undef ROUTINE_BDY
446#  undef DIM_2d
447   !
448   !                       !==  3D array and array of 3D pointer  ==!
449   !
450#  define DIM_3d
451#     define ROUTINE_BDY           mpp_lnk_bdy_3d
452#     include "mpp_bdy_generic.h90"
453#     undef ROUTINE_BDY
454#  undef DIM_3d
455   !
456   !                       !==  4D array and array of 4D pointer  ==!
457   !
458!!#  define DIM_4d
459!!#     define ROUTINE_BDY           mpp_lnk_bdy_4d
460!!#     include "mpp_bdy_generic.h90"
461!!#     undef ROUTINE_BDY
462!!#  undef DIM_4d
[3]463
[9019]464   !!----------------------------------------------------------------------
465   !!
466   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
[5429]467   
468   
[9019]469   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
[5429]470   
[9019]471   
472   !!----------------------------------------------------------------------
[5429]473
474
[888]475
[1344]476   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]477      !!----------------------------------------------------------------------
478      !!                  ***  routine mppsend  ***
[3764]479      !!
[51]480      !! ** Purpose :   Send messag passing array
481      !!
482      !!----------------------------------------------------------------------
[1344]483      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
484      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
485      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
486      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
487      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
488      !!
489      INTEGER ::   iflag
[51]490      !!----------------------------------------------------------------------
[1344]491      !
[1601]492      SELECT CASE ( cn_mpi_send )
[300]493      CASE ( 'S' )                ! Standard mpi send (blocking)
[9570]494         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
[300]495      CASE ( 'B' )                ! Buffer mpi send (blocking)
[9570]496         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
[300]497      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[1344]498         ! be carefull, one more argument here : the mpi request identifier..
[9570]499         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
[300]500      END SELECT
[1344]501      !
[51]502   END SUBROUTINE mppsend
[3]503
504
[3294]505   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]506      !!----------------------------------------------------------------------
507      !!                  ***  routine mpprecv  ***
508      !!
509      !! ** Purpose :   Receive messag passing array
510      !!
511      !!----------------------------------------------------------------------
[1344]512      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
513      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
514      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]515      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]516      !!
[51]517      INTEGER :: istatus(mpi_status_size)
518      INTEGER :: iflag
[3294]519      INTEGER :: use_source
[1344]520      !!----------------------------------------------------------------------
521      !
[3764]522      ! If a specific process number has been passed to the receive call,
[3294]523      ! use that one. Default is to use mpi_any_source
[6140]524      use_source = mpi_any_source
525      IF( PRESENT(ksource) )   use_source = ksource
526      !
[9570]527      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[1344]528      !
[51]529   END SUBROUTINE mpprecv
[3]530
531
[51]532   SUBROUTINE mppgather( ptab, kp, pio )
533      !!----------------------------------------------------------------------
534      !!                   ***  routine mppgather  ***
[3764]535      !!
536      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]537      !!     array which is distributed following the vertical level.
538      !!
[1344]539      !!----------------------------------------------------------------------
[6140]540      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
541      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]542      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]543      !!
[1344]544      INTEGER :: itaille, ierror   ! temporary integer
[51]545      !!---------------------------------------------------------------------
[1344]546      !
547      itaille = jpi * jpj
548      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]549         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[1344]550      !
[51]551   END SUBROUTINE mppgather
[3]552
553
[51]554   SUBROUTINE mppscatter( pio, kp, ptab )
555      !!----------------------------------------------------------------------
556      !!                  ***  routine mppscatter  ***
557      !!
[3764]558      !! ** Purpose :   Transfert between awork array which is distributed
[51]559      !!      following the vertical level and the local subdomain array.
560      !!
561      !!----------------------------------------------------------------------
[6140]562      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
563      INTEGER                             ::   kp     ! Tag (not used with MPI
564      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]565      !!
566      INTEGER :: itaille, ierror   ! temporary integer
[51]567      !!---------------------------------------------------------------------
[1344]568      !
[6140]569      itaille = jpi * jpj
[1344]570      !
571      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]572         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[1344]573      !
[51]574   END SUBROUTINE mppscatter
[3]575
[9019]576   !!----------------------------------------------------------------------
577   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
578   !!   
579   !!----------------------------------------------------------------------
580   !!
[869]581   SUBROUTINE mppmax_a_int( ktab, kdim, kcom )
[681]582      !!----------------------------------------------------------------------
[1344]583      INTEGER , INTENT(in   )                  ::   kdim   ! size of array
584      INTEGER , INTENT(inout), DIMENSION(kdim) ::   ktab   ! input array
[3764]585      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom   !
[9019]586      INTEGER :: ierror, ilocalcomm   ! temporary integer
[681]587      INTEGER, DIMENSION(kdim) ::   iwork
[1344]588      !!----------------------------------------------------------------------
[9570]589      ilocalcomm = mpi_comm_oce
[9019]590      IF( PRESENT(kcom) )   ilocalcomm = kcom
591      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_max, ilocalcomm, ierror )
[681]592      ktab(:) = iwork(:)
593   END SUBROUTINE mppmax_a_int
[9019]594   !!
[869]595   SUBROUTINE mppmax_int( ktab, kcom )
[681]596      !!----------------------------------------------------------------------
[6140]597      INTEGER, INTENT(inout)           ::   ktab   ! ???
598      INTEGER, INTENT(in   ), OPTIONAL ::   kcom   ! ???
[9019]599      INTEGER ::   ierror, iwork, ilocalcomm   ! temporary integer
[1344]600      !!----------------------------------------------------------------------
[9570]601      ilocalcomm = mpi_comm_oce
[9019]602      IF( PRESENT(kcom) )   ilocalcomm = kcom
603      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_max, ilocalcomm, ierror )
[681]604      ktab = iwork
605   END SUBROUTINE mppmax_int
[9019]606   !!
607   SUBROUTINE mppmax_a_real( ptab, kdim, kcom )
608      !!----------------------------------------------------------------------
609      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   ptab
610      INTEGER                  , INTENT(in   ) ::   kdim
611      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom
612      INTEGER :: ierror, ilocalcomm
613      REAL(wp), DIMENSION(kdim) ::  zwork
614      !!----------------------------------------------------------------------
[9570]615      ilocalcomm = mpi_comm_oce
[9019]616      IF( PRESENT(kcom) )   ilocalcomm = kcom
617      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_max, ilocalcomm, ierror )
618      ptab(:) = zwork(:)
619   END SUBROUTINE mppmax_a_real
620   !!
621   SUBROUTINE mppmax_real( ptab, kcom )
622      !!----------------------------------------------------------------------
623      REAL(wp), INTENT(inout)           ::   ptab   ! ???
624      INTEGER , INTENT(in   ), OPTIONAL ::   kcom   ! ???
625      INTEGER  ::   ierror, ilocalcomm
626      REAL(wp) ::   zwork
627      !!----------------------------------------------------------------------
[9570]628      ilocalcomm = mpi_comm_oce
[9019]629      IF( PRESENT(kcom) )   ilocalcomm = kcom!
630      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_max, ilocalcomm, ierror )
631      ptab = zwork
632   END SUBROUTINE mppmax_real
[681]633
634
[9019]635   !!----------------------------------------------------------------------
636   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
637   !!   
638   !!----------------------------------------------------------------------
639   !!
[869]640   SUBROUTINE mppmin_a_int( ktab, kdim, kcom )
[51]641      !!----------------------------------------------------------------------
[6140]642      INTEGER , INTENT( in  )                  ::   kdim   ! size of array
643      INTEGER , INTENT(inout), DIMENSION(kdim) ::   ktab   ! input array
644      INTEGER , INTENT( in  ), OPTIONAL        ::   kcom   ! input array
[1344]645      !!
[9019]646      INTEGER ::   ierror, ilocalcomm   ! temporary integer
[51]647      INTEGER, DIMENSION(kdim) ::   iwork
[1344]648      !!----------------------------------------------------------------------
[9570]649      ilocalcomm = mpi_comm_oce
[9019]650      IF( PRESENT(kcom) )   ilocalcomm = kcom
651      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_min, ilocalcomm, ierror )
[51]652      ktab(:) = iwork(:)
653   END SUBROUTINE mppmin_a_int
[9019]654   !!
[1345]655   SUBROUTINE mppmin_int( ktab, kcom )
[51]656      !!----------------------------------------------------------------------
657      INTEGER, INTENT(inout) ::   ktab      ! ???
[1345]658      INTEGER , INTENT( in  ), OPTIONAL        ::   kcom        ! input array
[1344]659      !!
[9019]660      INTEGER ::  ierror, iwork, ilocalcomm
[1344]661      !!----------------------------------------------------------------------
[9570]662      ilocalcomm = mpi_comm_oce
[9019]663      IF( PRESENT(kcom) )   ilocalcomm = kcom
664      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_min, ilocalcomm, ierror )
[51]665      ktab = iwork
666   END SUBROUTINE mppmin_int
[9019]667   !!
668   SUBROUTINE mppmin_a_real( ptab, kdim, kcom )
669      !!----------------------------------------------------------------------
670      INTEGER , INTENT(in   )                  ::   kdim
671      REAL(wp), INTENT(inout), DIMENSION(kdim) ::   ptab
672      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom
673      INTEGER :: ierror, ilocalcomm
674      REAL(wp), DIMENSION(kdim) ::   zwork
675      !!-----------------------------------------------------------------------
[9570]676      ilocalcomm = mpi_comm_oce
[9019]677      IF( PRESENT(kcom) )   ilocalcomm = kcom
678      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_min, ilocalcomm, ierror )
679      ptab(:) = zwork(:)
680   END SUBROUTINE mppmin_a_real
681   !!
682   SUBROUTINE mppmin_real( ptab, kcom )
683      !!-----------------------------------------------------------------------
684      REAL(wp), INTENT(inout)           ::   ptab        !
685      INTEGER , INTENT(in   ), OPTIONAL :: kcom
686      INTEGER  ::   ierror, ilocalcomm
687      REAL(wp) ::   zwork
688      !!-----------------------------------------------------------------------
[9570]689      ilocalcomm = mpi_comm_oce
[9019]690      IF( PRESENT(kcom) )   ilocalcomm = kcom
691      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_min, ilocalcomm, ierror )
692      ptab = zwork
693   END SUBROUTINE mppmin_real
[3]694
[13]695
[9019]696   !!----------------------------------------------------------------------
697   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
698   !!   
699   !!   Global sum of 1D array or a variable (integer, real or complex)
700   !!----------------------------------------------------------------------
701   !!
[51]702   SUBROUTINE mppsum_a_int( ktab, kdim )
703      !!----------------------------------------------------------------------
[6140]704      INTEGER, INTENT(in   )                   ::   kdim   ! ???
705      INTEGER, INTENT(inout), DIMENSION (kdim) ::   ktab   ! ???
[51]706      INTEGER :: ierror
707      INTEGER, DIMENSION (kdim) ::  iwork
[1344]708      !!----------------------------------------------------------------------
[9570]709      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_sum, mpi_comm_oce, ierror )
[51]710      ktab(:) = iwork(:)
711   END SUBROUTINE mppsum_a_int
[9019]712   !!
[1344]713   SUBROUTINE mppsum_int( ktab )
714      !!----------------------------------------------------------------------
715      INTEGER, INTENT(inout) ::   ktab
716      INTEGER :: ierror, iwork
717      !!----------------------------------------------------------------------
[9570]718      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_sum, mpi_comm_oce, ierror )
[1344]719      ktab = iwork
720   END SUBROUTINE mppsum_int
[9019]721   !!
[1344]722   SUBROUTINE mppsum_a_real( ptab, kdim, kcom )
723      !!-----------------------------------------------------------------------
[9019]724      INTEGER                  , INTENT(in   ) ::   kdim   ! size of ptab
725      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   ptab   ! input array
726      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom   ! specific communicator
727      INTEGER  ::   ierror, ilocalcomm    ! local integer
728      REAL(wp) ::   zwork(kdim)           ! local workspace
[1344]729      !!-----------------------------------------------------------------------
[9570]730      ilocalcomm = mpi_comm_oce
[9019]731      IF( PRESENT(kcom) )   ilocalcomm = kcom
732      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_sum, ilocalcomm, ierror )
[1344]733      ptab(:) = zwork(:)
734   END SUBROUTINE mppsum_a_real
[9019]735   !!
[1344]736   SUBROUTINE mppsum_real( ptab, kcom )
737      !!-----------------------------------------------------------------------
[9019]738      REAL(wp)          , INTENT(inout)           ::   ptab   ! input scalar
739      INTEGER , OPTIONAL, INTENT(in   ) ::   kcom
740      INTEGER  ::   ierror, ilocalcomm
[1344]741      REAL(wp) ::   zwork
742      !!-----------------------------------------------------------------------
[9570]743      ilocalcomm = mpi_comm_oce
[9019]744      IF( PRESENT(kcom) )   ilocalcomm = kcom
745      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_sum, ilocalcomm, ierror )
[1344]746      ptab = zwork
747   END SUBROUTINE mppsum_real
[9019]748   !!
[1976]749   SUBROUTINE mppsum_realdd( ytab, kcom )
750      !!-----------------------------------------------------------------------
[9019]751      COMPLEX(wp)          , INTENT(inout) ::   ytab    ! input scalar
752      INTEGER    , OPTIONAL, INTENT(in   ) ::   kcom
753      INTEGER     ::   ierror, ilocalcomm
[6140]754      COMPLEX(wp) ::   zwork
755      !!-----------------------------------------------------------------------
[9570]756      ilocalcomm = mpi_comm_oce
[9019]757      IF( PRESENT(kcom) )   ilocalcomm = kcom
758      CALL MPI_ALLREDUCE( ytab, zwork, 1, MPI_DOUBLE_COMPLEX, MPI_SUMDD, ilocalcomm, ierror )
[1976]759      ytab = zwork
760   END SUBROUTINE mppsum_realdd
[9019]761   !!
[1976]762   SUBROUTINE mppsum_a_realdd( ytab, kdim, kcom )
763      !!----------------------------------------------------------------------
[6140]764      INTEGER                     , INTENT(in   ) ::   kdim   ! size of ytab
765      COMPLEX(wp), DIMENSION(kdim), INTENT(inout) ::   ytab   ! input array
766      INTEGER    , OPTIONAL       , INTENT(in   ) ::   kcom
[9019]767      INTEGER:: ierror, ilocalcomm    ! local integer
[1976]768      COMPLEX(wp), DIMENSION(kdim) :: zwork     ! temporary workspace
[6140]769      !!-----------------------------------------------------------------------
[9570]770      ilocalcomm = mpi_comm_oce
[9019]771      IF( PRESENT(kcom) )   ilocalcomm = kcom
772      CALL MPI_ALLREDUCE( ytab, zwork, kdim, MPI_DOUBLE_COMPLEX, MPI_SUMDD, ilocalcomm, ierror )
[1976]773      ytab(:) = zwork(:)
774   END SUBROUTINE mppsum_a_realdd
[9019]775   
[3764]776
[9019]777   SUBROUTINE mppmax_real_multiple( pt1d, kdim, kcom  )
778      !!----------------------------------------------------------------------
779      !!                  ***  routine mppmax_real  ***
780      !!
781      !! ** Purpose :   Maximum across processor of each element of a 1D arrays
782      !!
783      !!----------------------------------------------------------------------
784      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   pt1d   ! 1D arrays
785      INTEGER                  , INTENT(in   ) ::   kdim
786      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom   ! local communicator
787      !!
788      INTEGER  ::   ierror, ilocalcomm
789      REAL(wp), DIMENSION(kdim) ::  zwork
790      !!----------------------------------------------------------------------
[9570]791      ilocalcomm = mpi_comm_oce
[9019]792      IF( PRESENT(kcom) )   ilocalcomm = kcom
793      !
794      CALL mpi_allreduce( pt1d, zwork, kdim, mpi_double_precision, mpi_max, ilocalcomm, ierror )
795      pt1d(:) = zwork(:)
796      !
797   END SUBROUTINE mppmax_real_multiple
[6140]798
[9019]799
[1344]800   SUBROUTINE mpp_minloc2d( ptab, pmask, pmin, ki,kj )
801      !!------------------------------------------------------------------------
802      !!             ***  routine mpp_minloc  ***
803      !!
804      !! ** Purpose :   Compute the global minimum of an array ptab
805      !!              and also give its global position
806      !!
807      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
808      !!
809      !!--------------------------------------------------------------------------
810      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   ptab    ! Local 2D array
811      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   pmask   ! Local mask
812      REAL(wp)                     , INTENT(  out) ::   pmin    ! Global minimum of ptab
[9019]813      INTEGER                      , INTENT(  out) ::   ki, kj  ! index of minimum in global frame
[6140]814      !
815      INTEGER :: ierror
[1344]816      INTEGER , DIMENSION(2)   ::   ilocs
817      REAL(wp) ::   zmin   ! local minimum
818      REAL(wp), DIMENSION(2,1) ::   zain, zaout
819      !!-----------------------------------------------------------------------
820      !
[9019]821      zmin  = MINVAL( ptab(:,:) , mask= pmask == 1._wp )
822      ilocs = MINLOC( ptab(:,:) , mask= pmask == 1._wp )
[1344]823      !
824      ki = ilocs(1) + nimpp - 1
825      kj = ilocs(2) + njmpp - 1
826      !
827      zain(1,:)=zmin
828      zain(2,:)=ki+10000.*kj
829      !
[9570]830      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MINLOC,MPI_COMM_OCE,ierror)
[1344]831      !
832      pmin = zaout(1,1)
833      kj = INT(zaout(2,1)/10000.)
834      ki = INT(zaout(2,1) - 10000.*kj )
835      !
836   END SUBROUTINE mpp_minloc2d
[13]837
[3]838
[1344]839   SUBROUTINE mpp_minloc3d( ptab, pmask, pmin, ki, kj ,kk)
840      !!------------------------------------------------------------------------
841      !!             ***  routine mpp_minloc  ***
842      !!
843      !! ** Purpose :   Compute the global minimum of an array ptab
844      !!              and also give its global position
845      !!
846      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
847      !!
848      !!--------------------------------------------------------------------------
[9019]849      REAL(wp), DIMENSION(:,:,:), INTENT(in   ) ::   ptab         ! Local 2D array
850      REAL(wp), DIMENSION(:,:,:), INTENT(in   ) ::   pmask        ! Local mask
851      REAL(wp)                  , INTENT(  out) ::   pmin         ! Global minimum of ptab
852      INTEGER                   , INTENT(  out) ::   ki, kj, kk   ! index of minimum in global frame
853      !
[1344]854      INTEGER  ::   ierror
855      REAL(wp) ::   zmin     ! local minimum
856      INTEGER , DIMENSION(3)   ::   ilocs
857      REAL(wp), DIMENSION(2,1) ::   zain, zaout
858      !!-----------------------------------------------------------------------
859      !
[9019]860      zmin  = MINVAL( ptab(:,:,:) , mask= pmask == 1._wp )
861      ilocs = MINLOC( ptab(:,:,:) , mask= pmask == 1._wp )
[1344]862      !
863      ki = ilocs(1) + nimpp - 1
864      kj = ilocs(2) + njmpp - 1
865      kk = ilocs(3)
866      !
[9019]867      zain(1,:) = zmin
868      zain(2,:) = ki + 10000.*kj + 100000000.*kk
[1344]869      !
[9570]870      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MINLOC,MPI_COMM_OCE,ierror)
[1344]871      !
872      pmin = zaout(1,1)
873      kk   = INT( zaout(2,1) / 100000000. )
874      kj   = INT( zaout(2,1) - kk * 100000000. ) / 10000
875      ki   = INT( zaout(2,1) - kk * 100000000. -kj * 10000. )
876      !
877   END SUBROUTINE mpp_minloc3d
[13]878
[3]879
[1344]880   SUBROUTINE mpp_maxloc2d( ptab, pmask, pmax, ki, kj )
881      !!------------------------------------------------------------------------
882      !!             ***  routine mpp_maxloc  ***
883      !!
884      !! ** Purpose :   Compute the global maximum of an array ptab
885      !!              and also give its global position
886      !!
887      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
888      !!
889      !!--------------------------------------------------------------------------
890      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   ptab     ! Local 2D array
891      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   pmask    ! Local mask
892      REAL(wp)                     , INTENT(  out) ::   pmax     ! Global maximum of ptab
893      INTEGER                      , INTENT(  out) ::   ki, kj   ! index of maximum in global frame
[3764]894      !!
[1344]895      INTEGER  :: ierror
896      INTEGER, DIMENSION (2)   ::   ilocs
897      REAL(wp) :: zmax   ! local maximum
898      REAL(wp), DIMENSION(2,1) ::   zain, zaout
899      !!-----------------------------------------------------------------------
900      !
[9019]901      zmax  = MAXVAL( ptab(:,:) , mask= pmask == 1._wp )
902      ilocs = MAXLOC( ptab(:,:) , mask= pmask == 1._wp )
[1344]903      !
904      ki = ilocs(1) + nimpp - 1
905      kj = ilocs(2) + njmpp - 1
906      !
907      zain(1,:) = zmax
908      zain(2,:) = ki + 10000. * kj
909      !
[9570]910      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MAXLOC,MPI_COMM_OCE,ierror)
[1344]911      !
912      pmax = zaout(1,1)
913      kj   = INT( zaout(2,1) / 10000.     )
914      ki   = INT( zaout(2,1) - 10000.* kj )
915      !
916   END SUBROUTINE mpp_maxloc2d
[3]917
[13]918
[1344]919   SUBROUTINE mpp_maxloc3d( ptab, pmask, pmax, ki, kj, kk )
920      !!------------------------------------------------------------------------
921      !!             ***  routine mpp_maxloc  ***
922      !!
923      !! ** Purpose :  Compute the global maximum of an array ptab
924      !!              and also give its global position
925      !!
926      !! ** Method : Use MPI_ALLREDUCE with MPI_MINLOC
927      !!
928      !!--------------------------------------------------------------------------
[9019]929      REAL(wp), DIMENSION (:,:,:), INTENT(in   ) ::   ptab         ! Local 2D array
930      REAL(wp), DIMENSION (:,:,:), INTENT(in   ) ::   pmask        ! Local mask
931      REAL(wp)                   , INTENT(  out) ::   pmax         ! Global maximum of ptab
932      INTEGER                    , INTENT(  out) ::   ki, kj, kk   ! index of maximum in global frame
933      !
934      INTEGER  ::   ierror   ! local integer
935      REAL(wp) ::   zmax     ! local maximum
[1344]936      REAL(wp), DIMENSION(2,1) ::   zain, zaout
937      INTEGER , DIMENSION(3)   ::   ilocs
938      !!-----------------------------------------------------------------------
939      !
[9019]940      zmax  = MAXVAL( ptab(:,:,:) , mask= pmask == 1._wp )
941      ilocs = MAXLOC( ptab(:,:,:) , mask= pmask == 1._wp )
[1344]942      !
943      ki = ilocs(1) + nimpp - 1
944      kj = ilocs(2) + njmpp - 1
945      kk = ilocs(3)
946      !
[9019]947      zain(1,:) = zmax
948      zain(2,:) = ki + 10000.*kj + 100000000.*kk
[1344]949      !
[9570]950      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MAXLOC,MPI_COMM_OCE,ierror)
[1344]951      !
952      pmax = zaout(1,1)
953      kk   = INT( zaout(2,1) / 100000000. )
954      kj   = INT( zaout(2,1) - kk * 100000000. ) / 10000
955      ki   = INT( zaout(2,1) - kk * 100000000. -kj * 10000. )
956      !
957   END SUBROUTINE mpp_maxloc3d
[3]958
[869]959
[1344]960   SUBROUTINE mppsync()
961      !!----------------------------------------------------------------------
962      !!                  ***  routine mppsync  ***
[3764]963      !!
[1344]964      !! ** Purpose :   Massively parallel processors, synchroneous
965      !!
966      !!-----------------------------------------------------------------------
967      INTEGER :: ierror
968      !!-----------------------------------------------------------------------
969      !
[9570]970      CALL mpi_barrier( mpi_comm_oce, ierror )
[1344]971      !
972   END SUBROUTINE mppsync
[3]973
974
[1344]975   SUBROUTINE mppstop
976      !!----------------------------------------------------------------------
977      !!                  ***  routine mppstop  ***
[3764]978      !!
[3294]979      !! ** purpose :   Stop massively parallel processors method
[1344]980      !!
981      !!----------------------------------------------------------------------
982      INTEGER ::   info
983      !!----------------------------------------------------------------------
984      !
985      CALL mppsync
986      CALL mpi_finalize( info )
987      !
988   END SUBROUTINE mppstop
[3]989
990
[1344]991   SUBROUTINE mpp_comm_free( kcom )
992      !!----------------------------------------------------------------------
993      INTEGER, INTENT(in) ::   kcom
994      !!
995      INTEGER :: ierr
996      !!----------------------------------------------------------------------
997      !
998      CALL MPI_COMM_FREE(kcom, ierr)
999      !
1000   END SUBROUTINE mpp_comm_free
[3]1001
[869]1002
[2715]1003   SUBROUTINE mpp_ini_ice( pindic, kumout )
[1344]1004      !!----------------------------------------------------------------------
1005      !!               ***  routine mpp_ini_ice  ***
1006      !!
1007      !! ** Purpose :   Initialize special communicator for ice areas
1008      !!      condition together with global variables needed in the ddmpp folding
1009      !!
1010      !! ** Method  : - Look for ice processors in ice routines
1011      !!              - Put their number in nrank_ice
1012      !!              - Create groups for the world processors and the ice processors
1013      !!              - Create a communicator for ice processors
1014      !!
1015      !! ** output
1016      !!      njmppmax = njmpp for northern procs
1017      !!      ndim_rank_ice = number of processors with ice
1018      !!      nrank_ice (ndim_rank_ice) = ice processors
[3625]1019      !!      ngrp_iworld = group ID for the world processors
[1344]1020      !!      ngrp_ice = group ID for the ice processors
1021      !!      ncomm_ice = communicator for the ice procs.
1022      !!      n_ice_root = number (in the world) of proc 0 in the ice comm.
1023      !!
1024      !!----------------------------------------------------------------------
[2715]1025      INTEGER, INTENT(in) ::   pindic
1026      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical unit
[1344]1027      !!
1028      INTEGER :: jjproc
[2715]1029      INTEGER :: ii, ierr
1030      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kice
1031      INTEGER, ALLOCATABLE, DIMENSION(:) ::   zwork
[1344]1032      !!----------------------------------------------------------------------
1033      !
[2715]1034      ALLOCATE( kice(jpnij), zwork(jpnij), STAT=ierr )
1035      IF( ierr /= 0 ) THEN
1036         WRITE(kumout, cform_err)
1037         WRITE(kumout,*) 'mpp_ini_ice : failed to allocate 2, 1D arrays (jpnij in length)'
1038         CALL mppstop
1039      ENDIF
1040
[1344]1041      ! Look for how many procs with sea-ice
1042      !
1043      kice = 0
1044      DO jjproc = 1, jpnij
[3764]1045         IF( jjproc == narea .AND. pindic .GT. 0 )   kice(jjproc) = 1
[1344]1046      END DO
1047      !
1048      zwork = 0
[9570]1049      CALL MPI_ALLREDUCE( kice, zwork, jpnij, mpi_integer, mpi_sum, mpi_comm_oce, ierr )
[3764]1050      ndim_rank_ice = SUM( zwork )
[3]1051
[1344]1052      ! Allocate the right size to nrank_north
[1441]1053      IF( ALLOCATED ( nrank_ice ) )   DEALLOCATE( nrank_ice )
[1344]1054      ALLOCATE( nrank_ice(ndim_rank_ice) )
1055      !
[3764]1056      ii = 0
[1344]1057      nrank_ice = 0
1058      DO jjproc = 1, jpnij
1059         IF( zwork(jjproc) == 1) THEN
1060            ii = ii + 1
[3764]1061            nrank_ice(ii) = jjproc -1
[1344]1062         ENDIF
1063      END DO
[1208]1064
[1344]1065      ! Create the world group
[9570]1066      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_iworld, ierr )
[869]1067
[1344]1068      ! Create the ice group from the world group
[3625]1069      CALL MPI_GROUP_INCL( ngrp_iworld, ndim_rank_ice, nrank_ice, ngrp_ice, ierr )
[869]1070
[1344]1071      ! Create the ice communicator , ie the pool of procs with sea-ice
[9570]1072      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_ice, ncomm_ice, ierr )
[869]1073
[1344]1074      ! Find proc number in the world of proc 0 in the north
1075      ! The following line seems to be useless, we just comment & keep it as reminder
[3625]1076      ! CALL MPI_GROUP_TRANSLATE_RANKS(ngrp_ice,1,0,ngrp_iworld,n_ice_root,ierr)
[1344]1077      !
[3625]1078      CALL MPI_GROUP_FREE(ngrp_ice, ierr)
1079      CALL MPI_GROUP_FREE(ngrp_iworld, ierr)
1080
[2715]1081      DEALLOCATE(kice, zwork)
1082      !
[1344]1083   END SUBROUTINE mpp_ini_ice
[869]1084
1085
[2715]1086   SUBROUTINE mpp_ini_znl( kumout )
[1345]1087      !!----------------------------------------------------------------------
1088      !!               ***  routine mpp_ini_znl  ***
1089      !!
1090      !! ** Purpose :   Initialize special communicator for computing zonal sum
1091      !!
1092      !! ** Method  : - Look for processors in the same row
1093      !!              - Put their number in nrank_znl
1094      !!              - Create group for the znl processors
1095      !!              - Create a communicator for znl processors
1096      !!              - Determine if processor should write znl files
1097      !!
1098      !! ** output
1099      !!      ndim_rank_znl = number of processors on the same row
1100      !!      ngrp_znl = group ID for the znl processors
1101      !!      ncomm_znl = communicator for the ice procs.
1102      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
1103      !!
1104      !!----------------------------------------------------------------------
[2715]1105      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]1106      !
[2715]1107      INTEGER :: jproc      ! dummy loop integer
1108      INTEGER :: ierr, ii   ! local integer
1109      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1110      !!----------------------------------------------------------------------
[1345]1111      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
1112      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]1113      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]1114      !
[2715]1115      ALLOCATE( kwork(jpnij), STAT=ierr )
1116      IF( ierr /= 0 ) THEN
1117         WRITE(kumout, cform_err)
1118         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
1119         CALL mppstop
1120      ENDIF
1121
1122      IF( jpnj == 1 ) THEN
[1345]1123         ngrp_znl  = ngrp_world
[9570]1124         ncomm_znl = mpi_comm_oce
[1345]1125      ELSE
1126         !
[9570]1127         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]1128         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1129         !-$$        CALL flush(numout)
1130         !
1131         ! Count number of processors on the same row
1132         ndim_rank_znl = 0
1133         DO jproc=1,jpnij
1134            IF ( kwork(jproc) == njmpp ) THEN
1135               ndim_rank_znl = ndim_rank_znl + 1
1136            ENDIF
1137         END DO
1138         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1139         !-$$        CALL flush(numout)
1140         ! Allocate the right size to nrank_znl
[1441]1141         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]1142         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]1143         ii = 0
[1345]1144         nrank_znl (:) = 0
1145         DO jproc=1,jpnij
1146            IF ( kwork(jproc) == njmpp) THEN
1147               ii = ii + 1
[3764]1148               nrank_znl(ii) = jproc -1
[1345]1149            ENDIF
1150         END DO
1151         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1152         !-$$        CALL flush(numout)
1153
1154         ! Create the opa group
[9570]1155         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]1156         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1157         !-$$        CALL flush(numout)
1158
1159         ! Create the znl group from the opa group
1160         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1161         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1162         !-$$        CALL flush(numout)
1163
1164         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]1165         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]1166         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1167         !-$$        CALL flush(numout)
1168         !
1169      END IF
1170
1171      ! Determines if processor if the first (starting from i=1) on the row
[3764]1172      IF ( jpni == 1 ) THEN
[1345]1173         l_znl_root = .TRUE.
1174      ELSE
1175         l_znl_root = .FALSE.
1176         kwork (1) = nimpp
1177         CALL mpp_min ( kwork(1), kcom = ncomm_znl)
1178         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1179      END IF
1180
[2715]1181      DEALLOCATE(kwork)
1182
[1345]1183   END SUBROUTINE mpp_ini_znl
1184
1185
[1344]1186   SUBROUTINE mpp_ini_north
1187      !!----------------------------------------------------------------------
1188      !!               ***  routine mpp_ini_north  ***
1189      !!
[3764]1190      !! ** Purpose :   Initialize special communicator for north folding
[1344]1191      !!      condition together with global variables needed in the mpp folding
1192      !!
1193      !! ** Method  : - Look for northern processors
1194      !!              - Put their number in nrank_north
1195      !!              - Create groups for the world processors and the north processors
1196      !!              - Create a communicator for northern processors
1197      !!
1198      !! ** output
1199      !!      njmppmax = njmpp for northern procs
1200      !!      ndim_rank_north = number of processors in the northern line
1201      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1202      !!      ngrp_world = group ID for the world processors
1203      !!      ngrp_north = group ID for the northern processors
1204      !!      ncomm_north = communicator for the northern procs.
1205      !!      north_root = number (in the world) of proc 0 in the northern comm.
1206      !!
1207      !!----------------------------------------------------------------------
1208      INTEGER ::   ierr
1209      INTEGER ::   jjproc
1210      INTEGER ::   ii, ji
1211      !!----------------------------------------------------------------------
1212      !
1213      njmppmax = MAXVAL( njmppt )
1214      !
1215      ! Look for how many procs on the northern boundary
1216      ndim_rank_north = 0
1217      DO jjproc = 1, jpnij
1218         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1219      END DO
1220      !
1221      ! Allocate the right size to nrank_north
[1441]1222      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]1223      ALLOCATE( nrank_north(ndim_rank_north) )
[869]1224
[1344]1225      ! Fill the nrank_north array with proc. number of northern procs.
1226      ! Note : the rank start at 0 in MPI
1227      ii = 0
1228      DO ji = 1, jpnij
1229         IF ( njmppt(ji) == njmppmax   ) THEN
1230            ii=ii+1
1231            nrank_north(ii)=ji-1
1232         END IF
1233      END DO
1234      !
1235      ! create the world group
[9570]1236      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]1237      !
1238      ! Create the North group from the world group
1239      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1240      !
1241      ! Create the North communicator , ie the pool of procs in the north group
[9570]1242      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]1243      !
1244   END SUBROUTINE mpp_ini_north
[869]1245
1246
[9570]1247   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
[1344]1248      !!---------------------------------------------------------------------
1249      !!                   ***  routine mpp_init.opa  ***
1250      !!
1251      !! ** Purpose :: export and attach a MPI buffer for bsend
1252      !!
1253      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1254      !!            but classical mpi_init
[3764]1255      !!
1256      !! History :: 01/11 :: IDRIS initial version for IBM only
[1344]1257      !!            08/04 :: R. Benshila, generalisation
1258      !!---------------------------------------------------------------------
[3764]1259      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
[2481]1260      INTEGER                      , INTENT(inout) ::   ksft
1261      INTEGER                      , INTENT(  out) ::   code
1262      INTEGER                                      ::   ierr, ji
1263      LOGICAL                                      ::   mpi_was_called
[1344]1264      !!---------------------------------------------------------------------
1265      !
1266      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
[532]1267      IF ( code /= MPI_SUCCESS ) THEN
[3764]1268         DO ji = 1, SIZE(ldtxt)
[2481]1269            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
[3764]1270         END DO
[2481]1271         WRITE(*, cform_err)
1272         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
[1344]1273         CALL mpi_abort( mpi_comm_world, code, ierr )
[532]1274      ENDIF
[1344]1275      !
1276      IF( .NOT. mpi_was_called ) THEN
1277         CALL mpi_init( code )
[9570]1278         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
[532]1279         IF ( code /= MPI_SUCCESS ) THEN
[3764]1280            DO ji = 1, SIZE(ldtxt)
[2481]1281               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1282            END DO
1283            WRITE(*, cform_err)
1284            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
[532]1285            CALL mpi_abort( mpi_comm_world, code, ierr )
1286         ENDIF
1287      ENDIF
[1344]1288      !
[897]1289      IF( nn_buffer > 0 ) THEN
[2481]1290         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
[897]1291         ! Buffer allocation and attachment
[2481]1292         ALLOCATE( tampon(nn_buffer), stat = ierr )
[3764]1293         IF( ierr /= 0 ) THEN
1294            DO ji = 1, SIZE(ldtxt)
[2481]1295               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1296            END DO
1297            WRITE(*, cform_err)
1298            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1299            CALL mpi_abort( mpi_comm_world, code, ierr )
1300         END IF
1301         CALL mpi_buffer_attach( tampon, nn_buffer, code )
[897]1302      ENDIF
[1344]1303      !
[9570]1304   END SUBROUTINE mpi_init_oce
[3]1305
[9019]1306
1307   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]1308      !!---------------------------------------------------------------------
1309      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1310      !!
1311      !!   Modification of original codes written by David H. Bailey
1312      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1313      !!---------------------------------------------------------------------
[9019]1314      INTEGER                     , INTENT(in)    ::   ilen, itype
1315      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1316      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]1317      !
1318      REAL(wp) :: zerr, zt1, zt2    ! local work variables
[9019]1319      INTEGER  :: ji, ztmp           ! local scalar
1320      !!---------------------------------------------------------------------
1321      !
[1976]1322      ztmp = itype   ! avoid compilation warning
[9019]1323      !
[1976]1324      DO ji=1,ilen
1325      ! Compute ydda + yddb using Knuth's trick.
1326         zt1  = real(ydda(ji)) + real(yddb(ji))
1327         zerr = zt1 - real(ydda(ji))
1328         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1329                + aimag(ydda(ji)) + aimag(yddb(ji))
1330
1331         ! The result is zt1 + zt2, after normalization.
1332         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1333      END DO
[9019]1334      !
[1976]1335   END SUBROUTINE DDPDD_MPI
1336
[6140]1337
[9019]1338   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
[4990]1339      !!---------------------------------------------------------------------
1340      !!                   ***  routine mpp_lbc_north_icb  ***
1341      !!
1342      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1343      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1344      !!              array with outer extra halo
1345      !!
1346      !! ** Method  :   North fold condition and mpp with more than one proc
1347      !!              in i-direction require a specific treatment. We gather
[9019]1348      !!              the 4+kextj northern lines of the global domain on 1
[4990]1349      !!              processor and apply lbc north-fold on this sub array.
1350      !!              Then we scatter the north fold array back to the processors.
[9019]1351      !!              This routine accounts for an extra halo with icebergs
1352      !!              and assumes ghost rows and columns have been suppressed.
[4990]1353      !!
1354      !!----------------------------------------------------------------------
1355      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1356      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1357      !                                                     !   = T ,  U , V , F or W -points
1358      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1359      !!                                                    ! north fold, =  1. otherwise
[9019]1360      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
[6140]1361      !
[4990]1362      INTEGER ::   ji, jj, jr
1363      INTEGER ::   ierr, itaille, ildi, ilei, iilb
[9019]1364      INTEGER ::   ipj, ij, iproc
[4990]1365      !
1366      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1367      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1368      !!----------------------------------------------------------------------
1369      !
[9019]1370      ipj=4
[9467]1371      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1372     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1373     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
[4990]1374      !
[9019]1375      ztab_e(:,:)      = 0._wp
1376      znorthloc_e(:,:) = 0._wp
[6140]1377      !
[9467]1378      ij = 1 - kextj
1379      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1380      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1381         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
[4990]1382         ij = ij + 1
1383      END DO
1384      !
[9467]1385      itaille = jpimax * ( ipj + 2*kextj )
1386      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1387         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1388         &                ncomm_north, ierr )
[4990]1389      !
1390      DO jr = 1, ndim_rank_north            ! recover the global north array
1391         iproc = nrank_north(jr) + 1
1392         ildi = nldit (iproc)
1393         ilei = nleit (iproc)
1394         iilb = nimppt(iproc)
[9467]1395         DO jj = 1-kextj, ipj+kextj
[4990]1396            DO ji = ildi, ilei
1397               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1398            END DO
1399         END DO
1400      END DO
1401
1402      ! 2. North-Fold boundary conditions
1403      ! ----------------------------------
[9467]1404      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
[4990]1405
[9467]1406      ij = 1 - kextj
[4990]1407      !! Scatter back to pt2d
[9467]1408      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
[9019]1409         DO ji= 1, jpi
[4990]1410            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1411         END DO
[9467]1412         ij  = ij +1
[4990]1413      END DO
1414      !
1415      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1416      !
1417   END SUBROUTINE mpp_lbc_north_icb
1418
[6140]1419
[9019]1420   SUBROUTINE mpp_lnk_2d_icb( pt2d, cd_type, psgn, kexti, kextj )
[4990]1421      !!----------------------------------------------------------------------
1422      !!                  ***  routine mpp_lnk_2d_icb  ***
1423      !!
[9019]1424      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1425      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1426      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
[4990]1427      !!
1428      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1429      !!      between processors following neighboring subdomains.
1430      !!            domain parameters
[9019]1431      !!                    jpi    : first dimension of the local subdomain
1432      !!                    jpj    : second dimension of the local subdomain
1433      !!                    kexti  : number of columns for extra outer halo
1434      !!                    kextj  : number of rows for extra outer halo
[4990]1435      !!                    nbondi : mark for "east-west local boundary"
1436      !!                    nbondj : mark for "north-south local boundary"
1437      !!                    noea   : number for local neighboring processors
1438      !!                    nowe   : number for local neighboring processors
1439      !!                    noso   : number for local neighboring processors
1440      !!                    nono   : number for local neighboring processors
1441      !!----------------------------------------------------------------------
[9019]1442      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1443      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1444      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1445      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1446      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1447      !
[4990]1448      INTEGER  ::   jl   ! dummy loop indices
[9019]1449      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1450      INTEGER  ::   ipreci, iprecj             !   -       -
[4990]1451      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1452      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1453      !!
[9019]1454      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1455      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
[4990]1456      !!----------------------------------------------------------------------
1457
[9019]1458      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1459      iprecj = nn_hls + kextj
[4990]1460
1461
1462      ! 1. standard boundary treatment
1463      ! ------------------------------
1464      ! Order matters Here !!!!
1465      !
1466      !                                      ! East-West boundaries
1467      !                                           !* Cyclic east-west
1468      IF( nbondi == 2 .AND. (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) ) THEN
[9019]1469         pt2d(1-kexti:     1    ,:) = pt2d(jpim1-kexti:  jpim1 ,:)       ! east
1470         pt2d(   jpi  :jpi+kexti,:) = pt2d(     2      :2+kexti,:)       ! west
[4990]1471         !
1472      ELSE                                        !* closed
[9019]1473         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls    ,:) = 0._wp    ! south except at F-point
1474                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! north
[4990]1475      ENDIF
1476      !
1477
1478      ! north fold treatment
1479      ! -----------------------
1480      IF( npolj /= 0 ) THEN
1481         !
1482         SELECT CASE ( jpni )
[9019]1483                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1484                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
[4990]1485         END SELECT
1486         !
1487      ENDIF
1488
1489      ! 2. East and west directions exchange
1490      ! ------------------------------------
1491      ! we play with the neigbours AND the row number because of the periodicity
1492      !
1493      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1494      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
[9019]1495         iihom = jpi-nreci-kexti
[4990]1496         DO jl = 1, ipreci
[9019]1497            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
[4990]1498            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1499         END DO
1500      END SELECT
1501      !
1502      !                           ! Migrations
[9019]1503      imigr = ipreci * ( jpj + 2*kextj )
[4990]1504      !
1505      SELECT CASE ( nbondi )
1506      CASE ( -1 )
[9019]1507         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1508         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
[4990]1509         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1510      CASE ( 0 )
[9019]1511         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1512         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1513         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1514         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
[4990]1515         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1516         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1517      CASE ( 1 )
[9019]1518         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1519         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
[4990]1520         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1521      END SELECT
1522      !
1523      !                           ! Write Dirichlet lateral conditions
[9019]1524      iihom = jpi - nn_hls
[4990]1525      !
1526      SELECT CASE ( nbondi )
1527      CASE ( -1 )
1528         DO jl = 1, ipreci
1529            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1530         END DO
1531      CASE ( 0 )
1532         DO jl = 1, ipreci
[9019]1533            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1534            pt2d(iihom+jl,:) = r2dew(:,jl,2)
[4990]1535         END DO
1536      CASE ( 1 )
1537         DO jl = 1, ipreci
[9019]1538            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
[4990]1539         END DO
1540      END SELECT
1541
1542
1543      ! 3. North and south directions
1544      ! -----------------------------
1545      ! always closed : we play only with the neigbours
1546      !
1547      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
[9019]1548         ijhom = jpj-nrecj-kextj
[4990]1549         DO jl = 1, iprecj
1550            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
[9019]1551            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
[4990]1552         END DO
1553      ENDIF
1554      !
1555      !                           ! Migrations
[9019]1556      imigr = iprecj * ( jpi + 2*kexti )
[4990]1557      !
1558      SELECT CASE ( nbondj )
1559      CASE ( -1 )
[9019]1560         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1561         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
[4990]1562         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1563      CASE ( 0 )
[9019]1564         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1565         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1566         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1567         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
[4990]1568         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1569         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1570      CASE ( 1 )
[9019]1571         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1572         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
[4990]1573         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1574      END SELECT
1575      !
1576      !                           ! Write Dirichlet lateral conditions
[9019]1577      ijhom = jpj - nn_hls
[4990]1578      !
1579      SELECT CASE ( nbondj )
1580      CASE ( -1 )
1581         DO jl = 1, iprecj
1582            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1583         END DO
1584      CASE ( 0 )
1585         DO jl = 1, iprecj
[9019]1586            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1587            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
[4990]1588         END DO
1589      CASE ( 1 )
1590         DO jl = 1, iprecj
[9019]1591            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
[4990]1592         END DO
1593      END SELECT
[9019]1594      !
[4990]1595   END SUBROUTINE mpp_lnk_2d_icb
[6140]1596   
[13]1597#else
1598   !!----------------------------------------------------------------------
1599   !!   Default case:            Dummy module        share memory computing
1600   !!----------------------------------------------------------------------
[2715]1601   USE in_out_manager
[1976]1602
[13]1603   INTERFACE mpp_sum
[3294]1604      MODULE PROCEDURE mpp_sum_a2s, mpp_sum_as, mpp_sum_ai, mpp_sum_s, mpp_sum_i, mppsum_realdd, mppsum_a_realdd
[13]1605   END INTERFACE
1606   INTERFACE mpp_max
[681]1607      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]1608   END INTERFACE
1609   INTERFACE mpp_min
1610      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1611   END INTERFACE
[1344]1612   INTERFACE mpp_minloc
1613      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1614   END INTERFACE
1615   INTERFACE mpp_maxloc
1616      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1617   END INTERFACE
[9019]1618   INTERFACE mpp_max_multiple
1619      MODULE PROCEDURE mppmax_real_multiple
1620   END INTERFACE
[3]1621
[13]1622   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
[4147]1623   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
[869]1624   INTEGER :: ncomm_ice
[9570]1625   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
[2715]1626   !!----------------------------------------------------------------------
[13]1627CONTAINS
[3]1628
[2715]1629   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1630      INTEGER, INTENT(in) ::   kumout
1631      lib_mpp_alloc = 0
1632   END FUNCTION lib_mpp_alloc
1633
[5407]1634   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
[1579]1635      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
[3764]1636      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
[5407]1637      CHARACTER(len=*) ::   ldname
[4314]1638      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
[9570]1639      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
[5412]1640      function_value = 0
[1579]1641      IF( .FALSE. )   ldtxt(:) = 'never done'
[5407]1642      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
[13]1643   END FUNCTION mynode
[3]1644
[13]1645   SUBROUTINE mppsync                       ! Dummy routine
1646   END SUBROUTINE mppsync
[3]1647
[869]1648   SUBROUTINE mpp_sum_as( parr, kdim, kcom )      ! Dummy routine
[13]1649      REAL   , DIMENSION(:) :: parr
1650      INTEGER               :: kdim
[3764]1651      INTEGER, OPTIONAL     :: kcom
[869]1652      WRITE(*,*) 'mpp_sum_as: You should not have seen this print! error?', kdim, parr(1), kcom
[13]1653   END SUBROUTINE mpp_sum_as
[3]1654
[869]1655   SUBROUTINE mpp_sum_a2s( parr, kdim, kcom )      ! Dummy routine
[13]1656      REAL   , DIMENSION(:,:) :: parr
1657      INTEGER               :: kdim
[3764]1658      INTEGER, OPTIONAL     :: kcom
[869]1659      WRITE(*,*) 'mpp_sum_a2s: You should not have seen this print! error?', kdim, parr(1,1), kcom
[13]1660   END SUBROUTINE mpp_sum_a2s
[3]1661
[869]1662   SUBROUTINE mpp_sum_ai( karr, kdim, kcom )      ! Dummy routine
[13]1663      INTEGER, DIMENSION(:) :: karr
1664      INTEGER               :: kdim
[3764]1665      INTEGER, OPTIONAL     :: kcom
[869]1666      WRITE(*,*) 'mpp_sum_ai: You should not have seen this print! error?', kdim, karr(1), kcom
[13]1667   END SUBROUTINE mpp_sum_ai
[3]1668
[869]1669   SUBROUTINE mpp_sum_s( psca, kcom )            ! Dummy routine
[13]1670      REAL                  :: psca
[3764]1671      INTEGER, OPTIONAL     :: kcom
[869]1672      WRITE(*,*) 'mpp_sum_s: You should not have seen this print! error?', psca, kcom
[13]1673   END SUBROUTINE mpp_sum_s
[2480]1674
[869]1675   SUBROUTINE mpp_sum_i( kint, kcom )            ! Dummy routine
[13]1676      integer               :: kint
[3764]1677      INTEGER, OPTIONAL     :: kcom
[869]1678      WRITE(*,*) 'mpp_sum_i: You should not have seen this print! error?', kint, kcom
[13]1679   END SUBROUTINE mpp_sum_i
1680
[3294]1681   SUBROUTINE mppsum_realdd( ytab, kcom )
1682      COMPLEX(wp), INTENT(inout)         :: ytab    ! input scalar
1683      INTEGER , INTENT( in  ), OPTIONAL :: kcom
1684      WRITE(*,*) 'mppsum_realdd: You should not have seen this print! error?', ytab
1685   END SUBROUTINE mppsum_realdd
[3764]1686
[3294]1687   SUBROUTINE mppsum_a_realdd( ytab, kdim, kcom )
1688      INTEGER , INTENT( in )                        ::   kdim      ! size of ytab
1689      COMPLEX(wp), DIMENSION(kdim), INTENT( inout ) ::   ytab      ! input array
1690      INTEGER , INTENT( in  ), OPTIONAL :: kcom
1691      WRITE(*,*) 'mppsum_a_realdd: You should not have seen this print! error?', kdim, ytab(1), kcom
1692   END SUBROUTINE mppsum_a_realdd
1693
[869]1694   SUBROUTINE mppmax_a_real( parr, kdim, kcom )
[13]1695      REAL   , DIMENSION(:) :: parr
1696      INTEGER               :: kdim
[3764]1697      INTEGER, OPTIONAL     :: kcom
[869]1698      WRITE(*,*) 'mppmax_a_real: You should not have seen this print! error?', kdim, parr(1), kcom
[13]1699   END SUBROUTINE mppmax_a_real
1700
[869]1701   SUBROUTINE mppmax_real( psca, kcom )
[13]1702      REAL                  :: psca
[3764]1703      INTEGER, OPTIONAL     :: kcom
[869]1704      WRITE(*,*) 'mppmax_real: You should not have seen this print! error?', psca, kcom
[13]1705   END SUBROUTINE mppmax_real
1706
[869]1707   SUBROUTINE mppmin_a_real( parr, kdim, kcom )
[13]1708      REAL   , DIMENSION(:) :: parr
1709      INTEGER               :: kdim
[3764]1710      INTEGER, OPTIONAL     :: kcom
[869]1711      WRITE(*,*) 'mppmin_a_real: You should not have seen this print! error?', kdim, parr(1), kcom
[13]1712   END SUBROUTINE mppmin_a_real
1713
[869]1714   SUBROUTINE mppmin_real( psca, kcom )
[13]1715      REAL                  :: psca
[3764]1716      INTEGER, OPTIONAL     :: kcom
[869]1717      WRITE(*,*) 'mppmin_real: You should not have seen this print! error?', psca, kcom
[13]1718   END SUBROUTINE mppmin_real
1719
[869]1720   SUBROUTINE mppmax_a_int( karr, kdim ,kcom)
[681]1721      INTEGER, DIMENSION(:) :: karr
1722      INTEGER               :: kdim
[3764]1723      INTEGER, OPTIONAL     :: kcom
[888]1724      WRITE(*,*) 'mppmax_a_int: You should not have seen this print! error?', kdim, karr(1), kcom
[681]1725   END SUBROUTINE mppmax_a_int
1726
[869]1727   SUBROUTINE mppmax_int( kint, kcom)
[681]1728      INTEGER               :: kint
[3764]1729      INTEGER, OPTIONAL     :: kcom
[869]1730      WRITE(*,*) 'mppmax_int: You should not have seen this print! error?', kint, kcom
[681]1731   END SUBROUTINE mppmax_int
1732
[869]1733   SUBROUTINE mppmin_a_int( karr, kdim, kcom )
[13]1734      INTEGER, DIMENSION(:) :: karr
1735      INTEGER               :: kdim
[3764]1736      INTEGER, OPTIONAL     :: kcom
[869]1737      WRITE(*,*) 'mppmin_a_int: You should not have seen this print! error?', kdim, karr(1), kcom
[13]1738   END SUBROUTINE mppmin_a_int
1739
[869]1740   SUBROUTINE mppmin_int( kint, kcom )
[13]1741      INTEGER               :: kint
[3764]1742      INTEGER, OPTIONAL     :: kcom
[869]1743      WRITE(*,*) 'mppmin_int: You should not have seen this print! error?', kint, kcom
[13]1744   END SUBROUTINE mppmin_int
1745
[1344]1746   SUBROUTINE mpp_minloc2d( ptab, pmask, pmin, ki, kj )
[181]1747      REAL                   :: pmin
1748      REAL , DIMENSION (:,:) :: ptab, pmask
1749      INTEGER :: ki, kj
[1528]1750      WRITE(*,*) 'mpp_minloc2d: You should not have seen this print! error?', pmin, ki, kj, ptab(1,1), pmask(1,1)
[181]1751   END SUBROUTINE mpp_minloc2d
1752
[1344]1753   SUBROUTINE mpp_minloc3d( ptab, pmask, pmin, ki, kj, kk )
[181]1754      REAL                     :: pmin
1755      REAL , DIMENSION (:,:,:) :: ptab, pmask
1756      INTEGER :: ki, kj, kk
[1528]1757      WRITE(*,*) 'mpp_minloc3d: You should not have seen this print! error?', pmin, ki, kj, kk, ptab(1,1,1), pmask(1,1,1)
[181]1758   END SUBROUTINE mpp_minloc3d
1759
[1344]1760   SUBROUTINE mpp_maxloc2d( ptab, pmask, pmax, ki, kj )
[181]1761      REAL                   :: pmax
1762      REAL , DIMENSION (:,:) :: ptab, pmask
1763      INTEGER :: ki, kj
[1528]1764      WRITE(*,*) 'mpp_maxloc2d: You should not have seen this print! error?', pmax, ki, kj, ptab(1,1), pmask(1,1)
[181]1765   END SUBROUTINE mpp_maxloc2d
1766
[1344]1767   SUBROUTINE mpp_maxloc3d( ptab, pmask, pmax, ki, kj, kk )
[181]1768      REAL                     :: pmax
1769      REAL , DIMENSION (:,:,:) :: ptab, pmask
1770      INTEGER :: ki, kj, kk
[1528]1771      WRITE(*,*) 'mpp_maxloc3d: You should not have seen this print! error?', pmax, ki, kj, kk, ptab(1,1,1), pmask(1,1,1)
[181]1772   END SUBROUTINE mpp_maxloc3d
1773
[51]1774   SUBROUTINE mppstop
[3799]1775      STOP      ! non MPP case, just stop the run
[51]1776   END SUBROUTINE mppstop
1777
[2715]1778   SUBROUTINE mpp_ini_ice( kcom, knum )
1779      INTEGER :: kcom, knum
1780      WRITE(*,*) 'mpp_ini_ice: You should not have seen this print! error?', kcom, knum
[888]1781   END SUBROUTINE mpp_ini_ice
[869]1782
[2715]1783   SUBROUTINE mpp_ini_znl( knum )
1784      INTEGER :: knum
1785      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
[1345]1786   END SUBROUTINE mpp_ini_znl
1787
[1344]1788   SUBROUTINE mpp_comm_free( kcom )
[869]1789      INTEGER :: kcom
[1344]1790      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
[869]1791   END SUBROUTINE mpp_comm_free
[9019]1792   
1793   SUBROUTINE mppmax_real_multiple( ptab, kdim , kcom  )
1794      REAL, DIMENSION(:) ::   ptab   !
1795      INTEGER            ::   kdim   !
1796      INTEGER, OPTIONAL  ::   kcom   !
1797      WRITE(*,*) 'mppmax_real_multiple: You should not have seen this print! error?', ptab(1), kdim
1798   END SUBROUTINE mppmax_real_multiple
1799
[3]1800#endif
[2715]1801
[13]1802   !!----------------------------------------------------------------------
[4147]1803   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
[2715]1804   !!----------------------------------------------------------------------
1805
1806   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1807      &                 cd6, cd7, cd8, cd9, cd10 )
1808      !!----------------------------------------------------------------------
1809      !!                  ***  ROUTINE  stop_opa  ***
1810      !!
[3764]1811      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1812      !!                increment the error number (nstop) by one.
1813      !!----------------------------------------------------------------------
1814      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1815      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1816      !!----------------------------------------------------------------------
1817      !
[3764]1818      nstop = nstop + 1
[2715]1819      IF(lwp) THEN
1820         WRITE(numout,cform_err)
1821         IF( PRESENT(cd1 ) )   WRITE(numout,*) cd1
1822         IF( PRESENT(cd2 ) )   WRITE(numout,*) cd2
1823         IF( PRESENT(cd3 ) )   WRITE(numout,*) cd3
1824         IF( PRESENT(cd4 ) )   WRITE(numout,*) cd4
1825         IF( PRESENT(cd5 ) )   WRITE(numout,*) cd5
1826         IF( PRESENT(cd6 ) )   WRITE(numout,*) cd6
1827         IF( PRESENT(cd7 ) )   WRITE(numout,*) cd7
1828         IF( PRESENT(cd8 ) )   WRITE(numout,*) cd8
1829         IF( PRESENT(cd9 ) )   WRITE(numout,*) cd9
1830         IF( PRESENT(cd10) )   WRITE(numout,*) cd10
1831      ENDIF
1832                               CALL FLUSH(numout    )
1833      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1834      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1835      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1836      !
1837      IF( cd1 == 'STOP' ) THEN
1838         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1839         CALL mppstop()
1840      ENDIF
1841      !
1842   END SUBROUTINE ctl_stop
1843
1844
1845   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1846      &                 cd6, cd7, cd8, cd9, cd10 )
1847      !!----------------------------------------------------------------------
1848      !!                  ***  ROUTINE  stop_warn  ***
1849      !!
[3764]1850      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1851      !!                increment the warning number (nwarn) by one.
1852      !!----------------------------------------------------------------------
1853      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1854      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1855      !!----------------------------------------------------------------------
[3764]1856      !
1857      nwarn = nwarn + 1
[2715]1858      IF(lwp) THEN
1859         WRITE(numout,cform_war)
1860         IF( PRESENT(cd1 ) ) WRITE(numout,*) cd1
1861         IF( PRESENT(cd2 ) ) WRITE(numout,*) cd2
1862         IF( PRESENT(cd3 ) ) WRITE(numout,*) cd3
1863         IF( PRESENT(cd4 ) ) WRITE(numout,*) cd4
1864         IF( PRESENT(cd5 ) ) WRITE(numout,*) cd5
1865         IF( PRESENT(cd6 ) ) WRITE(numout,*) cd6
1866         IF( PRESENT(cd7 ) ) WRITE(numout,*) cd7
1867         IF( PRESENT(cd8 ) ) WRITE(numout,*) cd8
1868         IF( PRESENT(cd9 ) ) WRITE(numout,*) cd9
1869         IF( PRESENT(cd10) ) WRITE(numout,*) cd10
1870      ENDIF
1871      CALL FLUSH(numout)
1872      !
1873   END SUBROUTINE ctl_warn
1874
1875
1876   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1877      !!----------------------------------------------------------------------
1878      !!                  ***  ROUTINE ctl_opn  ***
1879      !!
1880      !! ** Purpose :   Open file and check if required file is available.
1881      !!
1882      !! ** Method  :   Fortan open
1883      !!----------------------------------------------------------------------
1884      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1885      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1886      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1887      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1888      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1889      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1890      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1891      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1892      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1893      !
[2715]1894      CHARACTER(len=80) ::   clfile
1895      INTEGER           ::   iost
1896      !!----------------------------------------------------------------------
[5836]1897      !
[2715]1898      ! adapt filename
1899      ! ----------------
1900      clfile = TRIM(cdfile)
1901      IF( PRESENT( karea ) ) THEN
1902         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1903      ENDIF
1904#if defined key_agrif
1905      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1906      knum=Agrif_Get_Unit()
1907#else
1908      knum=get_unit()
1909#endif
[5836]1910      !
[2715]1911      iost=0
1912      IF( cdacce(1:6) == 'DIRECT' )  THEN
1913         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh, ERR=100, IOSTAT=iost )
1914      ELSE
1915         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat             , ERR=100, IOSTAT=iost )
1916      ENDIF
1917      IF( iost == 0 ) THEN
1918         IF(ldwp) THEN
1919            WRITE(kout,*) '     file   : ', clfile,' open ok'
1920            WRITE(kout,*) '     unit   = ', knum
1921            WRITE(kout,*) '     status = ', cdstat
1922            WRITE(kout,*) '     form   = ', cdform
1923            WRITE(kout,*) '     access = ', cdacce
1924            WRITE(kout,*)
1925         ENDIF
1926      ENDIF
1927100   CONTINUE
1928      IF( iost /= 0 ) THEN
1929         IF(ldwp) THEN
1930            WRITE(kout,*)
1931            WRITE(kout,*) ' ===>>>> : bad opening file: ', clfile
1932            WRITE(kout,*) ' =======   ===  '
1933            WRITE(kout,*) '           unit   = ', knum
1934            WRITE(kout,*) '           status = ', cdstat
1935            WRITE(kout,*) '           form   = ', cdform
1936            WRITE(kout,*) '           access = ', cdacce
1937            WRITE(kout,*) '           iostat = ', iost
1938            WRITE(kout,*) '           we stop. verify the file '
1939            WRITE(kout,*)
[9438]1940         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1941            WRITE(*,*)
1942            WRITE(*,*) ' ===>>>> : bad opening file: ', clfile
1943            WRITE(*,*) ' =======   ===  '
1944            WRITE(*,*) '           unit   = ', knum
1945            WRITE(*,*) '           status = ', cdstat
1946            WRITE(*,*) '           form   = ', cdform
1947            WRITE(*,*) '           access = ', cdacce
1948            WRITE(*,*) '           iostat = ', iost
1949            WRITE(*,*) '           we stop. verify the file '
1950            WRITE(*,*)
[2715]1951         ENDIF
[9019]1952         CALL FLUSH( kout ) 
[2715]1953         STOP 'ctl_opn bad opening'
1954      ENDIF
[5836]1955      !
[2715]1956   END SUBROUTINE ctl_opn
1957
[5836]1958
[4147]1959   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1960      !!----------------------------------------------------------------------
1961      !!                  ***  ROUTINE ctl_nam  ***
1962      !!
1963      !! ** Purpose :   Informations when error while reading a namelist
1964      !!
1965      !! ** Method  :   Fortan open
1966      !!----------------------------------------------------------------------
[5836]1967      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1968      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
[7646]1969      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
[5836]1970      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
[4147]1971      !!----------------------------------------------------------------------
[5836]1972      !
[7646]1973      WRITE (clios, '(I5.0)')   kios
[4147]1974      IF( kios < 0 ) THEN         
[5836]1975         CALL ctl_warn( 'end of record or file while reading namelist '   &
1976            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1977      ENDIF
[5836]1978      !
[4147]1979      IF( kios > 0 ) THEN
[5836]1980         CALL ctl_stop( 'misspelled variable in namelist '   &
1981            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1982      ENDIF
1983      kios = 0
1984      RETURN
[5836]1985      !
[4147]1986   END SUBROUTINE ctl_nam
1987
[5836]1988
[2715]1989   INTEGER FUNCTION get_unit()
1990      !!----------------------------------------------------------------------
1991      !!                  ***  FUNCTION  get_unit  ***
1992      !!
1993      !! ** Purpose :   return the index of an unused logical unit
1994      !!----------------------------------------------------------------------
[3764]1995      LOGICAL :: llopn
[2715]1996      !!----------------------------------------------------------------------
1997      !
1998      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1999      llopn = .TRUE.
2000      DO WHILE( (get_unit < 998) .AND. llopn )
2001         get_unit = get_unit + 1
2002         INQUIRE( unit = get_unit, opened = llopn )
2003      END DO
2004      IF( (get_unit == 999) .AND. llopn ) THEN
2005         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2006         get_unit = -1
2007      ENDIF
2008      !
2009   END FUNCTION get_unit
2010
2011   !!----------------------------------------------------------------------
[3]2012END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.