New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC – NEMO

source: NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC/lib_mpp.F90 @ 10407

Last change on this file since 10407 was 10407, checked in by smasson, 5 years ago

dev_r10164_HPC09_ESIWACE_PREP_MERGE: send back error code, see #2181

  • Property svn:keywords set to Id
File size: 88.0 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
[2715]35   !!----------------------------------------------------------------------
[3764]36#if   defined key_mpp_mpi
[13]37   !!----------------------------------------------------------------------
[1344]38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
[2715]40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]44   !!   mpprecv       :
[9019]45   !!   mppsend       :
[2715]46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
[1344]55   !!   mpp_ini_north : initialisation of north fold
[9019]56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[13]57   !!----------------------------------------------------------------------
[3764]58   USE dom_oce        ! ocean space and time domain
[2715]59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
[3]61
[13]62   IMPLICIT NONE
[415]63   PRIVATE
[9019]64
65   INTERFACE mpp_nfd
[10368]66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
[9019]67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
[10368]71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
[9019]72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
[4147]79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
[1344]80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
[9019]81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
[1344]84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10397]85   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
[3294]86   PUBLIC   mppscatter, mppgather
[10180]87   PUBLIC   mpp_ini_znl
[3764]88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[9890]89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
[5429]90   
[13]91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
[1344]93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
[681]99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]100   END INTERFACE
101   INTERFACE mpp_sum
[6140]102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
[9019]103         &             mppsum_realdd, mppsum_a_realdd
[13]104   END INTERFACE
[1344]105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
[6490]111
[51]112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
[1629]115!$AGRIF_DO_NOT_TREAT
[2004]116   INCLUDE 'mpif.h'
[1629]117!$AGRIF_END_DO_NOT_TREAT
[3764]118
[1344]119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[3]120
[1344]121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]122
[10330]123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]125!$AGRIF_DO_NOT_TREAT
[9570]126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]127!$AGRIF_END_DO_NOT_TREAT
[3]128
[2480]129   INTEGER :: MPI_SUMDD
[1976]130
[1345]131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]137
[3764]138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]147
[1344]148   ! Type of send : standard, buffered, immediate
[9019]149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
[3764]152
[10170]153   ! Communications summary report
[10297]154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm  calling routines
[10170]156   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
[10297]157   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
158   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
[10314]159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
[10170]160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10314]161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
[10297]162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
[10386]165   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
[10397]166   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
167   !: name (used as id) of allreduce-delayed operations
168   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb' /)
169   !: component name where the allreduce-delayed operation is performed
170   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
171   TYPE, PUBLIC ::   DELAYARR
172      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
173      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
174   END TYPE DELAYARR
175   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC  ::   todelay             
176   INTEGER,          DIMENSION(nbdelay), PUBLIC  ::   ndelayid = -1     !: mpi request id of the delayed operations
[10297]177
[10172]178   ! timing summary report
[10300]179   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
180   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
[10172]181   
[9019]182   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]183
[9019]184   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
185   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
186
[51]187   !!----------------------------------------------------------------------
[9598]188   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]189   !! $Id$
[10068]190   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]191   !!----------------------------------------------------------------------
[3]192CONTAINS
193
[9019]194   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
[2715]195      !!----------------------------------------------------------------------
[51]196      !!                  ***  routine mynode  ***
[3764]197      !!
[51]198      !! ** Purpose :   Find processor unit
199      !!----------------------------------------------------------------------
[6140]200      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
201      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
202      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
203      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
204      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
205      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
206      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]207      !
[4147]208      INTEGER ::   mynode, ierr, code, ji, ii, ios
[532]209      LOGICAL ::   mpi_was_called
[2715]210      !
[3294]211      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
[51]212      !!----------------------------------------------------------------------
[1344]213      !
[2481]214      ii = 1
[6140]215      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
216      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
217      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
[1344]218      !
[4147]219      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
220      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
[9168]221901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
[9019]222      !
[4147]223      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
224      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
[9168]225902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
[9019]226      !
[1344]227      !                              ! control print
[6140]228      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
229      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
230      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
[9019]231      !
232      IF( jpni < 1 .OR. jpnj < 1  ) THEN
[6140]233         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
[2715]234      ELSE
[6140]235         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
236         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
[9019]237      ENDIF
[2715]238
[3294]239      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
240
[2480]241      CALL mpi_initialized ( mpi_was_called, code )
242      IF( code /= MPI_SUCCESS ) THEN
[3764]243         DO ji = 1, SIZE(ldtxt)
[2481]244            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
[3764]245         END DO
[2480]246         WRITE(*, cform_err)
247         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
248         CALL mpi_abort( mpi_comm_world, code, ierr )
249      ENDIF
[415]250
[2480]251      IF( mpi_was_called ) THEN
252         !
253         SELECT CASE ( cn_mpi_send )
254         CASE ( 'S' )                ! Standard mpi send (blocking)
[6140]255            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
[2480]256         CASE ( 'B' )                ! Buffer mpi send (blocking)
[6140]257            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
[9570]258            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
[2480]259         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[6140]260            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
[2480]261            l_isend = .TRUE.
262         CASE DEFAULT
[6140]263            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
264            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
[2715]265            kstop = kstop + 1
[2480]266         END SELECT
[9019]267         !
268      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
[6140]269         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
270         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
[2715]271         kstop = kstop + 1
[532]272      ELSE
[1601]273         SELECT CASE ( cn_mpi_send )
[524]274         CASE ( 'S' )                ! Standard mpi send (blocking)
[6140]275            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
[2480]276            CALL mpi_init( ierr )
[524]277         CASE ( 'B' )                ! Buffer mpi send (blocking)
[6140]278            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
[9570]279            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
[524]280         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[6140]281            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
[524]282            l_isend = .TRUE.
[2480]283            CALL mpi_init( ierr )
[524]284         CASE DEFAULT
[6140]285            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
286            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
[2715]287            kstop = kstop + 1
[524]288         END SELECT
[2480]289         !
[415]290      ENDIF
[570]291
[3764]292      IF( PRESENT(localComm) ) THEN
[2480]293         IF( Agrif_Root() ) THEN
[9570]294            mpi_comm_oce = localComm
[2480]295         ENDIF
296      ELSE
[9570]297         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
[2480]298         IF( code /= MPI_SUCCESS ) THEN
[3764]299            DO ji = 1, SIZE(ldtxt)
[2481]300               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
301            END DO
[2480]302            WRITE(*, cform_err)
303            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
304            CALL mpi_abort( mpi_comm_world, code, ierr )
305         ENDIF
[3764]306      ENDIF
[2480]307
[5656]308#if defined key_agrif
[9019]309      IF( Agrif_Root() ) THEN
[9570]310         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]311      ELSE
[9570]312         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]313      ENDIF
314#endif
315
[9570]316      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
317      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[629]318      mynode = mpprank
[4624]319
320      IF( mynode == 0 ) THEN
[5407]321         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
322         WRITE(kumond, nammpp)     
[4624]323      ENDIF
[3764]324      !
[1976]325      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
326      !
[51]327   END FUNCTION mynode
[3]328
[9019]329   !!----------------------------------------------------------------------
330   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
331   !!
332   !!   * Argument : dummy argument use in mpp_lnk_... routines
333   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
334   !!                cd_nat :   nature of array grid-points
335   !!                psgn   :   sign used across the north fold boundary
336   !!                kfld   :   optional, number of pt3d arrays
337   !!                cd_mpp :   optional, fill the overlap area only
338   !!                pval   :   optional, background value (used at closed boundaries)
339   !!----------------------------------------------------------------------
340   !
341   !                       !==  2D array and array of 2D pointer  ==!
342   !
343#  define DIM_2d
344#     define ROUTINE_LNK           mpp_lnk_2d
345#     include "mpp_lnk_generic.h90"
346#     undef ROUTINE_LNK
347#     define MULTI
348#     define ROUTINE_LNK           mpp_lnk_2d_ptr
349#     include "mpp_lnk_generic.h90"
350#     undef ROUTINE_LNK
351#     undef MULTI
352#  undef DIM_2d
353   !
354   !                       !==  3D array and array of 3D pointer  ==!
355   !
356#  define DIM_3d
357#     define ROUTINE_LNK           mpp_lnk_3d
358#     include "mpp_lnk_generic.h90"
359#     undef ROUTINE_LNK
360#     define MULTI
361#     define ROUTINE_LNK           mpp_lnk_3d_ptr
362#     include "mpp_lnk_generic.h90"
363#     undef ROUTINE_LNK
364#     undef MULTI
365#  undef DIM_3d
366   !
367   !                       !==  4D array and array of 4D pointer  ==!
368   !
369#  define DIM_4d
370#     define ROUTINE_LNK           mpp_lnk_4d
371#     include "mpp_lnk_generic.h90"
372#     undef ROUTINE_LNK
373#     define MULTI
374#     define ROUTINE_LNK           mpp_lnk_4d_ptr
375#     include "mpp_lnk_generic.h90"
376#     undef ROUTINE_LNK
377#     undef MULTI
378#  undef DIM_4d
[6140]379
[9019]380   !!----------------------------------------------------------------------
381   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
382   !!
383   !!   * Argument : dummy argument use in mpp_nfd_... routines
384   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
385   !!                cd_nat :   nature of array grid-points
386   !!                psgn   :   sign used across the north fold boundary
387   !!                kfld   :   optional, number of pt3d arrays
388   !!                cd_mpp :   optional, fill the overlap area only
389   !!                pval   :   optional, background value (used at closed boundaries)
390   !!----------------------------------------------------------------------
391   !
392   !                       !==  2D array and array of 2D pointer  ==!
393   !
394#  define DIM_2d
395#     define ROUTINE_NFD           mpp_nfd_2d
396#     include "mpp_nfd_generic.h90"
397#     undef ROUTINE_NFD
398#     define MULTI
399#     define ROUTINE_NFD           mpp_nfd_2d_ptr
400#     include "mpp_nfd_generic.h90"
401#     undef ROUTINE_NFD
402#     undef MULTI
403#  undef DIM_2d
404   !
405   !                       !==  3D array and array of 3D pointer  ==!
406   !
407#  define DIM_3d
408#     define ROUTINE_NFD           mpp_nfd_3d
409#     include "mpp_nfd_generic.h90"
410#     undef ROUTINE_NFD
411#     define MULTI
412#     define ROUTINE_NFD           mpp_nfd_3d_ptr
413#     include "mpp_nfd_generic.h90"
414#     undef ROUTINE_NFD
415#     undef MULTI
416#  undef DIM_3d
417   !
418   !                       !==  4D array and array of 4D pointer  ==!
419   !
420#  define DIM_4d
421#     define ROUTINE_NFD           mpp_nfd_4d
422#     include "mpp_nfd_generic.h90"
423#     undef ROUTINE_NFD
424#     define MULTI
425#     define ROUTINE_NFD           mpp_nfd_4d_ptr
426#     include "mpp_nfd_generic.h90"
427#     undef ROUTINE_NFD
428#     undef MULTI
429#  undef DIM_4d
[3]430
[1344]431
[9019]432   !!----------------------------------------------------------------------
433   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
434   !!
435   !!   * Argument : dummy argument use in mpp_lnk_... routines
436   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
437   !!                cd_nat :   nature of array grid-points
438   !!                psgn   :   sign used across the north fold boundary
439   !!                kb_bdy :   BDY boundary set
440   !!                kfld   :   optional, number of pt3d arrays
441   !!----------------------------------------------------------------------
442   !
443   !                       !==  2D array and array of 2D pointer  ==!
444   !
445#  define DIM_2d
446#     define ROUTINE_BDY           mpp_lnk_bdy_2d
447#     include "mpp_bdy_generic.h90"
448#     undef ROUTINE_BDY
449#  undef DIM_2d
450   !
451   !                       !==  3D array and array of 3D pointer  ==!
452   !
453#  define DIM_3d
454#     define ROUTINE_BDY           mpp_lnk_bdy_3d
455#     include "mpp_bdy_generic.h90"
456#     undef ROUTINE_BDY
457#  undef DIM_3d
458   !
459   !                       !==  4D array and array of 4D pointer  ==!
460   !
[9890]461#  define DIM_4d
462#     define ROUTINE_BDY           mpp_lnk_bdy_4d
463#     include "mpp_bdy_generic.h90"
464#     undef ROUTINE_BDY
465#  undef DIM_4d
[3]466
[9019]467   !!----------------------------------------------------------------------
468   !!
469   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
[5429]470   
471   
[9019]472   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
[5429]473   
[9019]474   
475   !!----------------------------------------------------------------------
[5429]476
477
[888]478
[1344]479   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]480      !!----------------------------------------------------------------------
481      !!                  ***  routine mppsend  ***
[3764]482      !!
[51]483      !! ** Purpose :   Send messag passing array
484      !!
485      !!----------------------------------------------------------------------
[1344]486      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
487      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
488      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
489      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
490      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
491      !!
492      INTEGER ::   iflag
[51]493      !!----------------------------------------------------------------------
[1344]494      !
[1601]495      SELECT CASE ( cn_mpi_send )
[300]496      CASE ( 'S' )                ! Standard mpi send (blocking)
[9570]497         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
[300]498      CASE ( 'B' )                ! Buffer mpi send (blocking)
[9570]499         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
[300]500      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[1344]501         ! be carefull, one more argument here : the mpi request identifier..
[9570]502         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
[300]503      END SELECT
[1344]504      !
[51]505   END SUBROUTINE mppsend
[3]506
507
[3294]508   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]509      !!----------------------------------------------------------------------
510      !!                  ***  routine mpprecv  ***
511      !!
512      !! ** Purpose :   Receive messag passing array
513      !!
514      !!----------------------------------------------------------------------
[1344]515      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
516      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
517      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]518      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]519      !!
[51]520      INTEGER :: istatus(mpi_status_size)
521      INTEGER :: iflag
[3294]522      INTEGER :: use_source
[1344]523      !!----------------------------------------------------------------------
524      !
[3764]525      ! If a specific process number has been passed to the receive call,
[3294]526      ! use that one. Default is to use mpi_any_source
[6140]527      use_source = mpi_any_source
528      IF( PRESENT(ksource) )   use_source = ksource
529      !
[9570]530      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[1344]531      !
[51]532   END SUBROUTINE mpprecv
[3]533
534
[51]535   SUBROUTINE mppgather( ptab, kp, pio )
536      !!----------------------------------------------------------------------
537      !!                   ***  routine mppgather  ***
[3764]538      !!
539      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]540      !!     array which is distributed following the vertical level.
541      !!
[1344]542      !!----------------------------------------------------------------------
[6140]543      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
544      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]545      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]546      !!
[1344]547      INTEGER :: itaille, ierror   ! temporary integer
[51]548      !!---------------------------------------------------------------------
[1344]549      !
550      itaille = jpi * jpj
551      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]552         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[1344]553      !
[51]554   END SUBROUTINE mppgather
[3]555
556
[51]557   SUBROUTINE mppscatter( pio, kp, ptab )
558      !!----------------------------------------------------------------------
559      !!                  ***  routine mppscatter  ***
560      !!
[3764]561      !! ** Purpose :   Transfert between awork array which is distributed
[51]562      !!      following the vertical level and the local subdomain array.
563      !!
564      !!----------------------------------------------------------------------
[6140]565      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
566      INTEGER                             ::   kp     ! Tag (not used with MPI
567      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]568      !!
569      INTEGER :: itaille, ierror   ! temporary integer
[51]570      !!---------------------------------------------------------------------
[1344]571      !
[6140]572      itaille = jpi * jpj
[1344]573      !
574      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]575         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[1344]576      !
[51]577   END SUBROUTINE mppscatter
[3]578
[10397]579   
580   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
581     !!----------------------------------------------------------------------
582      !!                   ***  routine mpp_delay_sum  ***
[10359]583      !!
[10397]584      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
[10359]585      !!
[10292]586      !!----------------------------------------------------------------------
[10359]587      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
588      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
[10397]589      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
590      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
[10359]591      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
592      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
593      !!
[10397]594      INTEGER ::   ji, isz
[10359]595      INTEGER ::   idvar
[10397]596      INTEGER ::   ierr, ilocalcomm
597      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
[10359]598      !!----------------------------------------------------------------------
[10292]599      ilocalcomm = mpi_comm_oce
[10359]600      IF( PRESENT(kcom) )   ilocalcomm = kcom
601
[10397]602      isz = SIZE(y_in)
603     
[10359]604      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_glb = .TRUE. )
605
606      idvar = -1
607      DO ji = 1, nbdelay
[10397]608         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
[10359]609      END DO
[10397]610      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
611
612      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
613         !                                       --------------------------
614         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
615            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
616            DEALLOCATE(todelay(idvar)%z1d)
617            ndelayid(idvar) = -1                                      ! do as if we had no restart
618         ELSE
619            ALLOCATE(todelay(idvar)%y1d(isz))
620            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
621         END IF
622      ENDIF
623     
624      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
625         !                                       --------------------------
626         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
627         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
628         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
629      ENDIF
630
631      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
632
633      ! send back pout from todelay(idvar)%z1d defined at previous call
634      pout(:) = todelay(idvar)%z1d(:)
635
636      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
637      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
638
639   END SUBROUTINE mpp_delay_sum
640
641   
642   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
643      !!----------------------------------------------------------------------
644      !!                   ***  routine mpp_delay_max  ***
645      !!
646      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
647      !!
648      !!----------------------------------------------------------------------
649      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
650      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
651      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
652      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
653      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
654      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
655      !!
656      INTEGER ::   ji, isz
657      INTEGER ::   idvar
658      INTEGER ::   ierr, ilocalcomm
659      !!----------------------------------------------------------------------
660      ilocalcomm = mpi_comm_oce
661      IF( PRESENT(kcom) )   ilocalcomm = kcom
662
663      isz = SIZE(p_in)
664
665      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_glb = .TRUE. )
666
667      idvar = -1
668      DO ji = 1, nbdelay
669         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
670      END DO
[10359]671      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
672
[10397]673      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
674         !                                       --------------------------
675         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
676            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
677            DEALLOCATE(todelay(idvar)%z1d)
678            ndelayid(idvar) = -1                                      ! do as if we had no restart
679         END IF
[10292]680      ENDIF
[681]681
[10397]682      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
683         !                                       --------------------------
684         ALLOCATE(todelay(idvar)%z1d(isz))
685         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
[10359]686      ENDIF
687
[10397]688      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
689
690      ! send back pout from todelay(idvar)%z1d defined at previous call
691      pout(:) = todelay(idvar)%z1d(:)
692
693      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
694      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
695
[10359]696   END SUBROUTINE mpp_delay_max
[10397]697
[10297]698   
[10397]699   SUBROUTINE mpp_delay_rcv( kid )
700      !!----------------------------------------------------------------------
701      !!                   ***  routine mpp_delay_rcv  ***
702      !!
703      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
704      !!
705      !!----------------------------------------------------------------------
706      INTEGER,INTENT(in   )      ::  kid 
707      INTEGER ::   ierr
708      !!----------------------------------------------------------------------
709      IF( ndelayid(kid) /= -2 ) THEN 
710         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
711         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
712         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
713         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
714         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
715      ENDIF
716   END SUBROUTINE mpp_delay_rcv
717
718   
[9019]719   !!----------------------------------------------------------------------
[10297]720   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
721   !!   
722   !!----------------------------------------------------------------------
723   !!
724#  define OPERATION_MAX
725#  define INTEGER_TYPE
726#  define DIM_0d
727#     define ROUTINE_ALLREDUCE           mppmax_int
728#     include "mpp_allreduce_generic.h90"
729#     undef ROUTINE_ALLREDUCE
730#  undef DIM_0d
731#  define DIM_1d
732#     define ROUTINE_ALLREDUCE           mppmax_a_int
733#     include "mpp_allreduce_generic.h90"
734#     undef ROUTINE_ALLREDUCE
735#  undef DIM_1d
736#  undef INTEGER_TYPE
737!
738#  define REAL_TYPE
739#  define DIM_0d
740#     define ROUTINE_ALLREDUCE           mppmax_real
741#     include "mpp_allreduce_generic.h90"
742#     undef ROUTINE_ALLREDUCE
743#  undef DIM_0d
744#  define DIM_1d
745#     define ROUTINE_ALLREDUCE           mppmax_a_real
746#     include "mpp_allreduce_generic.h90"
747#     undef ROUTINE_ALLREDUCE
748#  undef DIM_1d
749#  undef REAL_TYPE
750#  undef OPERATION_MAX
751   !!----------------------------------------------------------------------
[9019]752   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
753   !!   
754   !!----------------------------------------------------------------------
755   !!
[10297]756#  define OPERATION_MIN
757#  define INTEGER_TYPE
758#  define DIM_0d
759#     define ROUTINE_ALLREDUCE           mppmin_int
760#     include "mpp_allreduce_generic.h90"
761#     undef ROUTINE_ALLREDUCE
762#  undef DIM_0d
763#  define DIM_1d
764#     define ROUTINE_ALLREDUCE           mppmin_a_int
765#     include "mpp_allreduce_generic.h90"
766#     undef ROUTINE_ALLREDUCE
767#  undef DIM_1d
768#  undef INTEGER_TYPE
769!
770#  define REAL_TYPE
771#  define DIM_0d
772#     define ROUTINE_ALLREDUCE           mppmin_real
773#     include "mpp_allreduce_generic.h90"
774#     undef ROUTINE_ALLREDUCE
775#  undef DIM_0d
776#  define DIM_1d
777#     define ROUTINE_ALLREDUCE           mppmin_a_real
778#     include "mpp_allreduce_generic.h90"
779#     undef ROUTINE_ALLREDUCE
780#  undef DIM_1d
781#  undef REAL_TYPE
782#  undef OPERATION_MIN
[3]783
[9019]784   !!----------------------------------------------------------------------
785   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
786   !!   
787   !!   Global sum of 1D array or a variable (integer, real or complex)
788   !!----------------------------------------------------------------------
789   !!
[10297]790#  define OPERATION_SUM
791#  define INTEGER_TYPE
792#  define DIM_0d
793#     define ROUTINE_ALLREDUCE           mppsum_int
794#     include "mpp_allreduce_generic.h90"
795#     undef ROUTINE_ALLREDUCE
796#  undef DIM_0d
797#  define DIM_1d
798#     define ROUTINE_ALLREDUCE           mppsum_a_int
799#     include "mpp_allreduce_generic.h90"
800#     undef ROUTINE_ALLREDUCE
801#  undef DIM_1d
802#  undef INTEGER_TYPE
803!
804#  define REAL_TYPE
805#  define DIM_0d
806#     define ROUTINE_ALLREDUCE           mppsum_real
807#     include "mpp_allreduce_generic.h90"
808#     undef ROUTINE_ALLREDUCE
809#  undef DIM_0d
810#  define DIM_1d
811#     define ROUTINE_ALLREDUCE           mppsum_a_real
812#     include "mpp_allreduce_generic.h90"
813#     undef ROUTINE_ALLREDUCE
814#  undef DIM_1d
815#  undef REAL_TYPE
816#  undef OPERATION_SUM
[3764]817
[10297]818#  define OPERATION_SUM_DD
819#  define COMPLEX_TYPE
820#  define DIM_0d
821#     define ROUTINE_ALLREDUCE           mppsum_realdd
822#     include "mpp_allreduce_generic.h90"
823#     undef ROUTINE_ALLREDUCE
824#  undef DIM_0d
825#  define DIM_1d
826#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
827#     include "mpp_allreduce_generic.h90"
828#     undef ROUTINE_ALLREDUCE
829#  undef DIM_1d
830#  undef COMPLEX_TYPE
831#  undef OPERATION_SUM_DD
832
[10314]833   !!----------------------------------------------------------------------
834   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
835   !!   
836   !!----------------------------------------------------------------------
837   !!
838#  define OPERATION_MINLOC
839#  define DIM_2d
840#     define ROUTINE_LOC           mpp_minloc2d
841#     include "mpp_loc_generic.h90"
842#     undef ROUTINE_LOC
843#  undef DIM_2d
844#  define DIM_3d
845#     define ROUTINE_LOC           mpp_minloc3d
846#     include "mpp_loc_generic.h90"
847#     undef ROUTINE_LOC
848#  undef DIM_3d
849#  undef OPERATION_MINLOC
[6140]850
[10314]851#  define OPERATION_MAXLOC
852#  define DIM_2d
853#     define ROUTINE_LOC           mpp_maxloc2d
854#     include "mpp_loc_generic.h90"
855#     undef ROUTINE_LOC
856#  undef DIM_2d
857#  define DIM_3d
858#     define ROUTINE_LOC           mpp_maxloc3d
859#     include "mpp_loc_generic.h90"
860#     undef ROUTINE_LOC
861#  undef DIM_3d
862#  undef OPERATION_MAXLOC
[9019]863
[1344]864   SUBROUTINE mppsync()
865      !!----------------------------------------------------------------------
866      !!                  ***  routine mppsync  ***
[3764]867      !!
[1344]868      !! ** Purpose :   Massively parallel processors, synchroneous
869      !!
870      !!-----------------------------------------------------------------------
871      INTEGER :: ierror
872      !!-----------------------------------------------------------------------
873      !
[9570]874      CALL mpi_barrier( mpi_comm_oce, ierror )
[1344]875      !
876   END SUBROUTINE mppsync
[3]877
878
[10358]879   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
[1344]880      !!----------------------------------------------------------------------
881      !!                  ***  routine mppstop  ***
[3764]882      !!
[3294]883      !! ** purpose :   Stop massively parallel processors method
[1344]884      !!
885      !!----------------------------------------------------------------------
[10330]886      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
[10358]887      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
888      LOGICAL ::   llfinal, ll_force_abort
[1344]889      INTEGER ::   info
890      !!----------------------------------------------------------------------
[10330]891      llfinal = .FALSE.
892      IF( PRESENT(ldfinal) ) llfinal = ldfinal
[10358]893      ll_force_abort = .FALSE.
894      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
895      !
896      IF(ll_force_abort) THEN
897         CALL mpi_abort( MPI_COMM_WORLD )
898      ELSE
899         CALL mppsync
900         CALL mpi_finalize( info )
901      ENDIF
[10330]902      IF( .NOT. llfinal ) STOP 123456
[1344]903      !
904   END SUBROUTINE mppstop
[3]905
906
[1344]907   SUBROUTINE mpp_comm_free( kcom )
908      !!----------------------------------------------------------------------
909      INTEGER, INTENT(in) ::   kcom
910      !!
911      INTEGER :: ierr
912      !!----------------------------------------------------------------------
913      !
914      CALL MPI_COMM_FREE(kcom, ierr)
915      !
916   END SUBROUTINE mpp_comm_free
[3]917
[869]918
[2715]919   SUBROUTINE mpp_ini_znl( kumout )
[1345]920      !!----------------------------------------------------------------------
921      !!               ***  routine mpp_ini_znl  ***
922      !!
923      !! ** Purpose :   Initialize special communicator for computing zonal sum
924      !!
925      !! ** Method  : - Look for processors in the same row
926      !!              - Put their number in nrank_znl
927      !!              - Create group for the znl processors
928      !!              - Create a communicator for znl processors
929      !!              - Determine if processor should write znl files
930      !!
931      !! ** output
932      !!      ndim_rank_znl = number of processors on the same row
933      !!      ngrp_znl = group ID for the znl processors
934      !!      ncomm_znl = communicator for the ice procs.
935      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
936      !!
937      !!----------------------------------------------------------------------
[2715]938      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]939      !
[2715]940      INTEGER :: jproc      ! dummy loop integer
941      INTEGER :: ierr, ii   ! local integer
942      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
943      !!----------------------------------------------------------------------
[1345]944      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
945      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]946      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]947      !
[2715]948      ALLOCATE( kwork(jpnij), STAT=ierr )
949      IF( ierr /= 0 ) THEN
950         WRITE(kumout, cform_err)
951         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
952         CALL mppstop
953      ENDIF
954
955      IF( jpnj == 1 ) THEN
[1345]956         ngrp_znl  = ngrp_world
[9570]957         ncomm_znl = mpi_comm_oce
[1345]958      ELSE
959         !
[9570]960         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]961         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
962         !-$$        CALL flush(numout)
963         !
964         ! Count number of processors on the same row
965         ndim_rank_znl = 0
966         DO jproc=1,jpnij
967            IF ( kwork(jproc) == njmpp ) THEN
968               ndim_rank_znl = ndim_rank_znl + 1
969            ENDIF
970         END DO
971         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
972         !-$$        CALL flush(numout)
973         ! Allocate the right size to nrank_znl
[1441]974         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]975         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]976         ii = 0
[1345]977         nrank_znl (:) = 0
978         DO jproc=1,jpnij
979            IF ( kwork(jproc) == njmpp) THEN
980               ii = ii + 1
[3764]981               nrank_znl(ii) = jproc -1
[1345]982            ENDIF
983         END DO
984         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
985         !-$$        CALL flush(numout)
986
987         ! Create the opa group
[9570]988         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]989         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
990         !-$$        CALL flush(numout)
991
992         ! Create the znl group from the opa group
993         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
994         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
995         !-$$        CALL flush(numout)
996
997         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]998         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]999         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1000         !-$$        CALL flush(numout)
1001         !
1002      END IF
1003
1004      ! Determines if processor if the first (starting from i=1) on the row
[3764]1005      IF ( jpni == 1 ) THEN
[1345]1006         l_znl_root = .TRUE.
1007      ELSE
1008         l_znl_root = .FALSE.
1009         kwork (1) = nimpp
[10297]1010         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]1011         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1012      END IF
1013
[2715]1014      DEALLOCATE(kwork)
1015
[1345]1016   END SUBROUTINE mpp_ini_znl
1017
1018
[1344]1019   SUBROUTINE mpp_ini_north
1020      !!----------------------------------------------------------------------
1021      !!               ***  routine mpp_ini_north  ***
1022      !!
[3764]1023      !! ** Purpose :   Initialize special communicator for north folding
[1344]1024      !!      condition together with global variables needed in the mpp folding
1025      !!
1026      !! ** Method  : - Look for northern processors
1027      !!              - Put their number in nrank_north
1028      !!              - Create groups for the world processors and the north processors
1029      !!              - Create a communicator for northern processors
1030      !!
1031      !! ** output
1032      !!      njmppmax = njmpp for northern procs
1033      !!      ndim_rank_north = number of processors in the northern line
1034      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1035      !!      ngrp_world = group ID for the world processors
1036      !!      ngrp_north = group ID for the northern processors
1037      !!      ncomm_north = communicator for the northern procs.
1038      !!      north_root = number (in the world) of proc 0 in the northern comm.
1039      !!
1040      !!----------------------------------------------------------------------
1041      INTEGER ::   ierr
1042      INTEGER ::   jjproc
1043      INTEGER ::   ii, ji
1044      !!----------------------------------------------------------------------
1045      !
1046      njmppmax = MAXVAL( njmppt )
1047      !
1048      ! Look for how many procs on the northern boundary
1049      ndim_rank_north = 0
1050      DO jjproc = 1, jpnij
1051         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1052      END DO
1053      !
1054      ! Allocate the right size to nrank_north
[1441]1055      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]1056      ALLOCATE( nrank_north(ndim_rank_north) )
[869]1057
[1344]1058      ! Fill the nrank_north array with proc. number of northern procs.
1059      ! Note : the rank start at 0 in MPI
1060      ii = 0
1061      DO ji = 1, jpnij
1062         IF ( njmppt(ji) == njmppmax   ) THEN
1063            ii=ii+1
1064            nrank_north(ii)=ji-1
1065         END IF
1066      END DO
1067      !
1068      ! create the world group
[9570]1069      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]1070      !
1071      ! Create the North group from the world group
1072      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1073      !
1074      ! Create the North communicator , ie the pool of procs in the north group
[9570]1075      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]1076      !
1077   END SUBROUTINE mpp_ini_north
[869]1078
1079
[9570]1080   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
[1344]1081      !!---------------------------------------------------------------------
1082      !!                   ***  routine mpp_init.opa  ***
1083      !!
1084      !! ** Purpose :: export and attach a MPI buffer for bsend
1085      !!
1086      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1087      !!            but classical mpi_init
[3764]1088      !!
1089      !! History :: 01/11 :: IDRIS initial version for IBM only
[1344]1090      !!            08/04 :: R. Benshila, generalisation
1091      !!---------------------------------------------------------------------
[3764]1092      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
[2481]1093      INTEGER                      , INTENT(inout) ::   ksft
1094      INTEGER                      , INTENT(  out) ::   code
1095      INTEGER                                      ::   ierr, ji
1096      LOGICAL                                      ::   mpi_was_called
[1344]1097      !!---------------------------------------------------------------------
1098      !
1099      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
[532]1100      IF ( code /= MPI_SUCCESS ) THEN
[3764]1101         DO ji = 1, SIZE(ldtxt)
[2481]1102            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
[3764]1103         END DO
[2481]1104         WRITE(*, cform_err)
1105         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
[1344]1106         CALL mpi_abort( mpi_comm_world, code, ierr )
[532]1107      ENDIF
[1344]1108      !
1109      IF( .NOT. mpi_was_called ) THEN
1110         CALL mpi_init( code )
[9570]1111         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
[532]1112         IF ( code /= MPI_SUCCESS ) THEN
[3764]1113            DO ji = 1, SIZE(ldtxt)
[2481]1114               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1115            END DO
1116            WRITE(*, cform_err)
1117            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
[532]1118            CALL mpi_abort( mpi_comm_world, code, ierr )
1119         ENDIF
1120      ENDIF
[1344]1121      !
[897]1122      IF( nn_buffer > 0 ) THEN
[2481]1123         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
[897]1124         ! Buffer allocation and attachment
[2481]1125         ALLOCATE( tampon(nn_buffer), stat = ierr )
[3764]1126         IF( ierr /= 0 ) THEN
1127            DO ji = 1, SIZE(ldtxt)
[2481]1128               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1129            END DO
1130            WRITE(*, cform_err)
1131            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1132            CALL mpi_abort( mpi_comm_world, code, ierr )
1133         END IF
1134         CALL mpi_buffer_attach( tampon, nn_buffer, code )
[897]1135      ENDIF
[1344]1136      !
[9570]1137   END SUBROUTINE mpi_init_oce
[3]1138
[9019]1139
1140   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]1141      !!---------------------------------------------------------------------
1142      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1143      !!
1144      !!   Modification of original codes written by David H. Bailey
1145      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1146      !!---------------------------------------------------------------------
[9019]1147      INTEGER                     , INTENT(in)    ::   ilen, itype
1148      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1149      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]1150      !
1151      REAL(wp) :: zerr, zt1, zt2    ! local work variables
[9019]1152      INTEGER  :: ji, ztmp           ! local scalar
1153      !!---------------------------------------------------------------------
1154      !
[1976]1155      ztmp = itype   ! avoid compilation warning
[9019]1156      !
[1976]1157      DO ji=1,ilen
1158      ! Compute ydda + yddb using Knuth's trick.
1159         zt1  = real(ydda(ji)) + real(yddb(ji))
1160         zerr = zt1 - real(ydda(ji))
1161         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1162                + aimag(ydda(ji)) + aimag(yddb(ji))
1163
1164         ! The result is zt1 + zt2, after normalization.
1165         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1166      END DO
[9019]1167      !
[1976]1168   END SUBROUTINE DDPDD_MPI
1169
[6140]1170
[9019]1171   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
[4990]1172      !!---------------------------------------------------------------------
1173      !!                   ***  routine mpp_lbc_north_icb  ***
1174      !!
1175      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1176      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1177      !!              array with outer extra halo
1178      !!
1179      !! ** Method  :   North fold condition and mpp with more than one proc
1180      !!              in i-direction require a specific treatment. We gather
[9019]1181      !!              the 4+kextj northern lines of the global domain on 1
[4990]1182      !!              processor and apply lbc north-fold on this sub array.
1183      !!              Then we scatter the north fold array back to the processors.
[9019]1184      !!              This routine accounts for an extra halo with icebergs
1185      !!              and assumes ghost rows and columns have been suppressed.
[4990]1186      !!
1187      !!----------------------------------------------------------------------
1188      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1189      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1190      !                                                     !   = T ,  U , V , F or W -points
1191      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1192      !!                                                    ! north fold, =  1. otherwise
[9019]1193      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
[6140]1194      !
[4990]1195      INTEGER ::   ji, jj, jr
1196      INTEGER ::   ierr, itaille, ildi, ilei, iilb
[9019]1197      INTEGER ::   ipj, ij, iproc
[4990]1198      !
1199      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1200      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1201      !!----------------------------------------------------------------------
1202      !
[9019]1203      ipj=4
[9467]1204      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1205     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1206     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
[4990]1207      !
[9019]1208      ztab_e(:,:)      = 0._wp
1209      znorthloc_e(:,:) = 0._wp
[6140]1210      !
[9467]1211      ij = 1 - kextj
1212      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1213      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1214         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
[4990]1215         ij = ij + 1
1216      END DO
1217      !
[9467]1218      itaille = jpimax * ( ipj + 2*kextj )
[10314]1219      !
1220      IF( ln_timing ) CALL tic_tac(.TRUE.)
[9467]1221      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1222         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1223         &                ncomm_north, ierr )
[4990]1224      !
[10314]1225      IF( ln_timing ) CALL tic_tac(.FALSE.)
1226      !
[4990]1227      DO jr = 1, ndim_rank_north            ! recover the global north array
1228         iproc = nrank_north(jr) + 1
1229         ildi = nldit (iproc)
1230         ilei = nleit (iproc)
1231         iilb = nimppt(iproc)
[9467]1232         DO jj = 1-kextj, ipj+kextj
[4990]1233            DO ji = ildi, ilei
1234               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1235            END DO
1236         END DO
1237      END DO
1238
1239      ! 2. North-Fold boundary conditions
1240      ! ----------------------------------
[9467]1241      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
[4990]1242
[9467]1243      ij = 1 - kextj
[4990]1244      !! Scatter back to pt2d
[9467]1245      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
[9019]1246         DO ji= 1, jpi
[4990]1247            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1248         END DO
[9467]1249         ij  = ij +1
[4990]1250      END DO
1251      !
1252      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1253      !
1254   END SUBROUTINE mpp_lbc_north_icb
1255
[6140]1256
[10314]1257   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
[4990]1258      !!----------------------------------------------------------------------
1259      !!                  ***  routine mpp_lnk_2d_icb  ***
1260      !!
[9019]1261      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1262      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1263      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
[4990]1264      !!
1265      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1266      !!      between processors following neighboring subdomains.
1267      !!            domain parameters
[9019]1268      !!                    jpi    : first dimension of the local subdomain
1269      !!                    jpj    : second dimension of the local subdomain
1270      !!                    kexti  : number of columns for extra outer halo
1271      !!                    kextj  : number of rows for extra outer halo
[4990]1272      !!                    nbondi : mark for "east-west local boundary"
1273      !!                    nbondj : mark for "north-south local boundary"
1274      !!                    noea   : number for local neighboring processors
1275      !!                    nowe   : number for local neighboring processors
1276      !!                    noso   : number for local neighboring processors
1277      !!                    nono   : number for local neighboring processors
1278      !!----------------------------------------------------------------------
[10314]1279      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
[9019]1280      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1281      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1282      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1283      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1284      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1285      !
[4990]1286      INTEGER  ::   jl   ! dummy loop indices
[9019]1287      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1288      INTEGER  ::   ipreci, iprecj             !   -       -
[4990]1289      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1290      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1291      !!
[9019]1292      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1293      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
[4990]1294      !!----------------------------------------------------------------------
1295
[9019]1296      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1297      iprecj = nn_hls + kextj
[4990]1298
[10314]1299      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
[4990]1300
1301      ! 1. standard boundary treatment
1302      ! ------------------------------
1303      ! Order matters Here !!!!
1304      !
1305      !                                      ! East-West boundaries
1306      !                                           !* Cyclic east-west
[9667]1307      IF( l_Iperio ) THEN
1308         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1309         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
[4990]1310         !
1311      ELSE                                        !* closed
[9667]1312         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1313                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
[4990]1314      ENDIF
[9667]1315      !                                      ! North-South boundaries
1316      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1317         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1318         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1319      ELSE                                        !* closed
1320         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1321                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1322      ENDIF
[4990]1323      !
1324
1325      ! north fold treatment
1326      ! -----------------------
1327      IF( npolj /= 0 ) THEN
1328         !
1329         SELECT CASE ( jpni )
[9019]1330                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1331                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
[4990]1332         END SELECT
1333         !
1334      ENDIF
1335
1336      ! 2. East and west directions exchange
1337      ! ------------------------------------
1338      ! we play with the neigbours AND the row number because of the periodicity
1339      !
1340      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1341      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
[9019]1342         iihom = jpi-nreci-kexti
[4990]1343         DO jl = 1, ipreci
[9019]1344            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
[4990]1345            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1346         END DO
1347      END SELECT
1348      !
1349      !                           ! Migrations
[9019]1350      imigr = ipreci * ( jpj + 2*kextj )
[4990]1351      !
[10314]1352      IF( ln_timing ) CALL tic_tac(.TRUE.)
1353      !
[4990]1354      SELECT CASE ( nbondi )
1355      CASE ( -1 )
[9019]1356         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1357         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
[4990]1358         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1359      CASE ( 0 )
[9019]1360         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1361         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1362         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1363         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
[4990]1364         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1365         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1366      CASE ( 1 )
[9019]1367         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1368         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
[4990]1369         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1370      END SELECT
1371      !
[10314]1372      IF( ln_timing ) CALL tic_tac(.FALSE.)
1373      !
[4990]1374      !                           ! Write Dirichlet lateral conditions
[9019]1375      iihom = jpi - nn_hls
[4990]1376      !
1377      SELECT CASE ( nbondi )
1378      CASE ( -1 )
1379         DO jl = 1, ipreci
1380            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1381         END DO
1382      CASE ( 0 )
1383         DO jl = 1, ipreci
[9019]1384            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1385            pt2d(iihom+jl,:) = r2dew(:,jl,2)
[4990]1386         END DO
1387      CASE ( 1 )
1388         DO jl = 1, ipreci
[9019]1389            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
[4990]1390         END DO
1391      END SELECT
1392
1393
1394      ! 3. North and south directions
1395      ! -----------------------------
1396      ! always closed : we play only with the neigbours
1397      !
1398      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
[9019]1399         ijhom = jpj-nrecj-kextj
[4990]1400         DO jl = 1, iprecj
1401            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
[9019]1402            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
[4990]1403         END DO
1404      ENDIF
1405      !
1406      !                           ! Migrations
[9019]1407      imigr = iprecj * ( jpi + 2*kexti )
[4990]1408      !
[10314]1409      IF( ln_timing ) CALL tic_tac(.TRUE.)
1410      !
[4990]1411      SELECT CASE ( nbondj )
1412      CASE ( -1 )
[9019]1413         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1414         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
[4990]1415         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1416      CASE ( 0 )
[9019]1417         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1418         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1419         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1420         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
[4990]1421         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1422         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1423      CASE ( 1 )
[9019]1424         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1425         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
[4990]1426         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1427      END SELECT
1428      !
[10314]1429      IF( ln_timing ) CALL tic_tac(.FALSE.)
1430      !
[4990]1431      !                           ! Write Dirichlet lateral conditions
[9019]1432      ijhom = jpj - nn_hls
[4990]1433      !
1434      SELECT CASE ( nbondj )
1435      CASE ( -1 )
1436         DO jl = 1, iprecj
1437            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1438         END DO
1439      CASE ( 0 )
1440         DO jl = 1, iprecj
[9019]1441            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1442            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
[4990]1443         END DO
1444      CASE ( 1 )
1445         DO jl = 1, iprecj
[9019]1446            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
[4990]1447         END DO
1448      END SELECT
[9019]1449      !
[4990]1450   END SUBROUTINE mpp_lnk_2d_icb
[10172]1451
[10314]1452
1453   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb )
1454      !!----------------------------------------------------------------------
1455      !!                  ***  routine mpp_report  ***
1456      !!
1457      !! ** Purpose :   report use of mpp routines per time-setp
1458      !!
1459      !!----------------------------------------------------------------------
1460      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1461      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1462      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb
1463      !!
1464      LOGICAL ::   ll_lbc, ll_glb
1465      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1466      !!----------------------------------------------------------------------
1467      !
1468      ll_lbc = .FALSE.
1469      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1470      ll_glb = .FALSE.
1471      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1472      !
1473      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
[10397]1474      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1475      ncom_freq = ncom_fsbc
[10314]1476      !
1477      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1478         IF( ll_lbc ) THEN
1479            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1480            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1481            n_sequence_lbc = n_sequence_lbc + 1
[10397]1482            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
[10314]1483            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1484            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1485            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1486         ENDIF
1487         IF( ll_glb ) THEN
1488            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1489            n_sequence_glb = n_sequence_glb + 1
[10397]1490            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
[10314]1491            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1492         ENDIF
1493      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1494         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1495         WRITE(numcom,*) ' '
1496         WRITE(numcom,*) ' ------------------------------------------------------------'
1497         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1498         WRITE(numcom,*) ' ------------------------------------------------------------'
1499         WRITE(numcom,*) ' '
1500         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1501         jj = 0; jk = 0; jf = 0; jh = 0
1502         DO ji = 1, n_sequence_lbc
1503            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1504            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1505            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1506            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1507         END DO
1508         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1509         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1510         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1511         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1512         WRITE(numcom,*) ' '
1513         WRITE(numcom,*) ' lbc_lnk called'
1514         jj = 1
1515         DO ji = 2, n_sequence_lbc
1516            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1517               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1518               jj = 0
1519            END IF
1520            jj = jj + 1 
1521         END DO
1522         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1523         WRITE(numcom,*) ' '
1524         IF ( n_sequence_glb > 0 ) THEN
1525            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1526            jj = 1
1527            DO ji = 2, n_sequence_glb
1528               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1529                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1530                  jj = 0
1531               END IF
1532               jj = jj + 1 
1533            END DO
1534            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1535            DEALLOCATE(crname_glb)
1536         ELSE
1537            WRITE(numcom,*) ' No MPI global communication '
1538         ENDIF
1539         WRITE(numcom,*) ' '
1540         WRITE(numcom,*) ' -----------------------------------------------'
1541         WRITE(numcom,*) ' '
1542         DEALLOCATE(ncomm_sequence)
1543         DEALLOCATE(crname_lbc)
1544      ENDIF
1545   END SUBROUTINE mpp_report
1546
[6140]1547   
[10300]1548   SUBROUTINE tic_tac (ld_tic, ld_global)
[10172]1549
[10300]1550    LOGICAL,           INTENT(IN) :: ld_tic
1551    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1552    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1553    REAL(wp),               SAVE :: tic_ct = 0._wp
1554    INTEGER :: ii
[10172]1555
1556    IF( ncom_stp <= nit000 ) RETURN
1557    IF( ncom_stp == nitend ) RETURN
[10300]1558    ii = 1
1559    IF( PRESENT( ld_global ) ) THEN
1560       IF( ld_global ) ii = 2
1561    END IF
[10172]1562   
[10300]1563    IF ( ld_tic ) THEN
1564       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
[10172]1565       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1566    ELSE
[10300]1567       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
[10172]1568       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1569    ENDIF
1570   
1571   END SUBROUTINE tic_tac
1572
1573   
[13]1574#else
1575   !!----------------------------------------------------------------------
1576   !!   Default case:            Dummy module        share memory computing
1577   !!----------------------------------------------------------------------
[2715]1578   USE in_out_manager
[1976]1579
[13]1580   INTERFACE mpp_sum
[10314]1581      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
[13]1582   END INTERFACE
1583   INTERFACE mpp_max
[681]1584      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]1585   END INTERFACE
1586   INTERFACE mpp_min
1587      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1588   END INTERFACE
[1344]1589   INTERFACE mpp_minloc
1590      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1591   END INTERFACE
1592   INTERFACE mpp_maxloc
1593      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1594   END INTERFACE
[3]1595
[13]1596   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
[4147]1597   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
[9570]1598   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
[10359]1599
1600   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1601   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
[10397]1602   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1603   TYPE ::   DELAYARR
1604      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1605      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1606   END TYPE DELAYARR
1607   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
[10359]1608   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
[2715]1609   !!----------------------------------------------------------------------
[13]1610CONTAINS
[3]1611
[2715]1612   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1613      INTEGER, INTENT(in) ::   kumout
1614      lib_mpp_alloc = 0
1615   END FUNCTION lib_mpp_alloc
1616
[5407]1617   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
[1579]1618      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
[3764]1619      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
[5407]1620      CHARACTER(len=*) ::   ldname
[4314]1621      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
[9570]1622      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
[5412]1623      function_value = 0
[1579]1624      IF( .FALSE. )   ldtxt(:) = 'never done'
[5407]1625      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
[13]1626   END FUNCTION mynode
[3]1627
[13]1628   SUBROUTINE mppsync                       ! Dummy routine
1629   END SUBROUTINE mppsync
[3]1630
[10314]1631   !!----------------------------------------------------------------------
1632   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1633   !!   
1634   !!----------------------------------------------------------------------
1635   !!
1636#  define OPERATION_MAX
1637#  define INTEGER_TYPE
1638#  define DIM_0d
1639#     define ROUTINE_ALLREDUCE           mppmax_int
1640#     include "mpp_allreduce_generic.h90"
1641#     undef ROUTINE_ALLREDUCE
1642#  undef DIM_0d
1643#  define DIM_1d
1644#     define ROUTINE_ALLREDUCE           mppmax_a_int
1645#     include "mpp_allreduce_generic.h90"
1646#     undef ROUTINE_ALLREDUCE
1647#  undef DIM_1d
1648#  undef INTEGER_TYPE
1649!
1650#  define REAL_TYPE
1651#  define DIM_0d
1652#     define ROUTINE_ALLREDUCE           mppmax_real
1653#     include "mpp_allreduce_generic.h90"
1654#     undef ROUTINE_ALLREDUCE
1655#  undef DIM_0d
1656#  define DIM_1d
1657#     define ROUTINE_ALLREDUCE           mppmax_a_real
1658#     include "mpp_allreduce_generic.h90"
1659#     undef ROUTINE_ALLREDUCE
1660#  undef DIM_1d
1661#  undef REAL_TYPE
1662#  undef OPERATION_MAX
1663   !!----------------------------------------------------------------------
1664   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1665   !!   
1666   !!----------------------------------------------------------------------
1667   !!
1668#  define OPERATION_MIN
1669#  define INTEGER_TYPE
1670#  define DIM_0d
1671#     define ROUTINE_ALLREDUCE           mppmin_int
1672#     include "mpp_allreduce_generic.h90"
1673#     undef ROUTINE_ALLREDUCE
1674#  undef DIM_0d
1675#  define DIM_1d
1676#     define ROUTINE_ALLREDUCE           mppmin_a_int
1677#     include "mpp_allreduce_generic.h90"
1678#     undef ROUTINE_ALLREDUCE
1679#  undef DIM_1d
1680#  undef INTEGER_TYPE
1681!
1682#  define REAL_TYPE
1683#  define DIM_0d
1684#     define ROUTINE_ALLREDUCE           mppmin_real
1685#     include "mpp_allreduce_generic.h90"
1686#     undef ROUTINE_ALLREDUCE
1687#  undef DIM_0d
1688#  define DIM_1d
1689#     define ROUTINE_ALLREDUCE           mppmin_a_real
1690#     include "mpp_allreduce_generic.h90"
1691#     undef ROUTINE_ALLREDUCE
1692#  undef DIM_1d
1693#  undef REAL_TYPE
1694#  undef OPERATION_MIN
[3]1695
[10314]1696   !!----------------------------------------------------------------------
1697   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1698   !!   
1699   !!   Global sum of 1D array or a variable (integer, real or complex)
1700   !!----------------------------------------------------------------------
1701   !!
1702#  define OPERATION_SUM
1703#  define INTEGER_TYPE
1704#  define DIM_0d
1705#     define ROUTINE_ALLREDUCE           mppsum_int
1706#     include "mpp_allreduce_generic.h90"
1707#     undef ROUTINE_ALLREDUCE
1708#  undef DIM_0d
1709#  define DIM_1d
1710#     define ROUTINE_ALLREDUCE           mppsum_a_int
1711#     include "mpp_allreduce_generic.h90"
1712#     undef ROUTINE_ALLREDUCE
1713#  undef DIM_1d
1714#  undef INTEGER_TYPE
1715!
1716#  define REAL_TYPE
1717#  define DIM_0d
1718#     define ROUTINE_ALLREDUCE           mppsum_real
1719#     include "mpp_allreduce_generic.h90"
1720#     undef ROUTINE_ALLREDUCE
1721#  undef DIM_0d
1722#  define DIM_1d
1723#     define ROUTINE_ALLREDUCE           mppsum_a_real
1724#     include "mpp_allreduce_generic.h90"
1725#     undef ROUTINE_ALLREDUCE
1726#  undef DIM_1d
1727#  undef REAL_TYPE
1728#  undef OPERATION_SUM
[3]1729
[10314]1730#  define OPERATION_SUM_DD
1731#  define COMPLEX_TYPE
1732#  define DIM_0d
1733#     define ROUTINE_ALLREDUCE           mppsum_realdd
1734#     include "mpp_allreduce_generic.h90"
1735#     undef ROUTINE_ALLREDUCE
1736#  undef DIM_0d
1737#  define DIM_1d
1738#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1739#     include "mpp_allreduce_generic.h90"
1740#     undef ROUTINE_ALLREDUCE
1741#  undef DIM_1d
1742#  undef COMPLEX_TYPE
1743#  undef OPERATION_SUM_DD
[3]1744
[10314]1745   !!----------------------------------------------------------------------
1746   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1747   !!   
1748   !!----------------------------------------------------------------------
1749   !!
1750#  define OPERATION_MINLOC
1751#  define DIM_2d
1752#     define ROUTINE_LOC           mpp_minloc2d
1753#     include "mpp_loc_generic.h90"
1754#     undef ROUTINE_LOC
1755#  undef DIM_2d
1756#  define DIM_3d
1757#     define ROUTINE_LOC           mpp_minloc3d
1758#     include "mpp_loc_generic.h90"
1759#     undef ROUTINE_LOC
1760#  undef DIM_3d
1761#  undef OPERATION_MINLOC
[2480]1762
[10314]1763#  define OPERATION_MAXLOC
1764#  define DIM_2d
1765#     define ROUTINE_LOC           mpp_maxloc2d
1766#     include "mpp_loc_generic.h90"
1767#     undef ROUTINE_LOC
1768#  undef DIM_2d
1769#  define DIM_3d
1770#     define ROUTINE_LOC           mpp_maxloc3d
1771#     include "mpp_loc_generic.h90"
1772#     undef ROUTINE_LOC
1773#  undef DIM_3d
1774#  undef OPERATION_MAXLOC
[13]1775
[10397]1776   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
[10359]1777      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1778      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
[10397]1779      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1780      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
[10359]1781      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1782      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
[10397]1783      !
1784      pout(:) = REAL(y_in(:), wp)
1785   END SUBROUTINE mpp_delay_sum
1786
1787   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1788      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1789      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1790      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1791      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1792      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1793      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1794      !
1795      pout(:) = p_in(:)
[10359]1796   END SUBROUTINE mpp_delay_max
[10292]1797
[10397]1798   SUBROUTINE mpp_delay_rcv( kid )
1799      INTEGER,INTENT(in   )      ::  kid 
1800      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1801   END SUBROUTINE mpp_delay_rcv
1802   
[10358]1803   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1804      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1805      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
[3799]1806      STOP      ! non MPP case, just stop the run
[51]1807   END SUBROUTINE mppstop
1808
[2715]1809   SUBROUTINE mpp_ini_znl( knum )
1810      INTEGER :: knum
1811      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
[1345]1812   END SUBROUTINE mpp_ini_znl
1813
[1344]1814   SUBROUTINE mpp_comm_free( kcom )
[869]1815      INTEGER :: kcom
[1344]1816      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
[869]1817   END SUBROUTINE mpp_comm_free
[9019]1818   
[3]1819#endif
[2715]1820
[13]1821   !!----------------------------------------------------------------------
[4147]1822   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
[2715]1823   !!----------------------------------------------------------------------
1824
1825   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1826      &                 cd6, cd7, cd8, cd9, cd10 )
1827      !!----------------------------------------------------------------------
1828      !!                  ***  ROUTINE  stop_opa  ***
1829      !!
[3764]1830      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1831      !!                increment the error number (nstop) by one.
1832      !!----------------------------------------------------------------------
1833      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1834      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1835      !!----------------------------------------------------------------------
1836      !
[3764]1837      nstop = nstop + 1
[2715]1838      IF(lwp) THEN
1839         WRITE(numout,cform_err)
[10330]1840         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1841         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1842         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1843         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1844         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1845         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1846         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1847         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1848         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1849         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[2715]1850      ENDIF
1851                               CALL FLUSH(numout    )
1852      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1853      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1854      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1855      !
1856      IF( cd1 == 'STOP' ) THEN
1857         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
[10407]1858         CALL mppstop(ld_force_abort = .true.)
[2715]1859      ENDIF
1860      !
1861   END SUBROUTINE ctl_stop
1862
1863
1864   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1865      &                 cd6, cd7, cd8, cd9, cd10 )
1866      !!----------------------------------------------------------------------
1867      !!                  ***  ROUTINE  stop_warn  ***
1868      !!
[3764]1869      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1870      !!                increment the warning number (nwarn) by one.
1871      !!----------------------------------------------------------------------
1872      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1873      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1874      !!----------------------------------------------------------------------
[3764]1875      !
1876      nwarn = nwarn + 1
[2715]1877      IF(lwp) THEN
1878         WRITE(numout,cform_war)
[10330]1879         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1880         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1881         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1882         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1883         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1884         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1885         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1886         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1887         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1888         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
[2715]1889      ENDIF
1890      CALL FLUSH(numout)
1891      !
1892   END SUBROUTINE ctl_warn
1893
1894
1895   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1896      !!----------------------------------------------------------------------
1897      !!                  ***  ROUTINE ctl_opn  ***
1898      !!
1899      !! ** Purpose :   Open file and check if required file is available.
1900      !!
1901      !! ** Method  :   Fortan open
1902      !!----------------------------------------------------------------------
1903      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1904      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1905      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1906      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1907      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1908      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1909      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1910      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1911      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1912      !
[2715]1913      CHARACTER(len=80) ::   clfile
1914      INTEGER           ::   iost
1915      !!----------------------------------------------------------------------
[5836]1916      !
[2715]1917      ! adapt filename
1918      ! ----------------
1919      clfile = TRIM(cdfile)
1920      IF( PRESENT( karea ) ) THEN
1921         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1922      ENDIF
1923#if defined key_agrif
1924      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1925      knum=Agrif_Get_Unit()
1926#else
1927      knum=get_unit()
1928#endif
[10330]1929      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1930      !
[2715]1931      iost=0
[10368]1932      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
[10358]1933         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
[10368]1934      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
[10358]1935         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1936      ELSE
[10358]1937         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1938      ENDIF
[10358]1939      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1940         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
[2715]1941      IF( iost == 0 ) THEN
1942         IF(ldwp) THEN
[10358]1943            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1944            WRITE(kout,*) '     unit   = ', knum
1945            WRITE(kout,*) '     status = ', cdstat
1946            WRITE(kout,*) '     form   = ', cdform
1947            WRITE(kout,*) '     access = ', cdacce
1948            WRITE(kout,*)
1949         ENDIF
1950      ENDIF
1951100   CONTINUE
1952      IF( iost /= 0 ) THEN
1953         IF(ldwp) THEN
1954            WRITE(kout,*)
[10358]1955            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
[2715]1956            WRITE(kout,*) ' =======   ===  '
1957            WRITE(kout,*) '           unit   = ', knum
1958            WRITE(kout,*) '           status = ', cdstat
1959            WRITE(kout,*) '           form   = ', cdform
1960            WRITE(kout,*) '           access = ', cdacce
1961            WRITE(kout,*) '           iostat = ', iost
1962            WRITE(kout,*) '           we stop. verify the file '
1963            WRITE(kout,*)
[9438]1964         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1965            WRITE(*,*)
[10358]1966            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
[9438]1967            WRITE(*,*) ' =======   ===  '
1968            WRITE(*,*) '           unit   = ', knum
1969            WRITE(*,*) '           status = ', cdstat
1970            WRITE(*,*) '           form   = ', cdform
1971            WRITE(*,*) '           access = ', cdacce
1972            WRITE(*,*) '           iostat = ', iost
1973            WRITE(*,*) '           we stop. verify the file '
1974            WRITE(*,*)
[2715]1975         ENDIF
[9019]1976         CALL FLUSH( kout ) 
[2715]1977         STOP 'ctl_opn bad opening'
1978      ENDIF
[5836]1979      !
[2715]1980   END SUBROUTINE ctl_opn
1981
[5836]1982
[4147]1983   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1984      !!----------------------------------------------------------------------
1985      !!                  ***  ROUTINE ctl_nam  ***
1986      !!
1987      !! ** Purpose :   Informations when error while reading a namelist
1988      !!
1989      !! ** Method  :   Fortan open
1990      !!----------------------------------------------------------------------
[5836]1991      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1992      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
[7646]1993      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
[5836]1994      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
[4147]1995      !!----------------------------------------------------------------------
[5836]1996      !
[7646]1997      WRITE (clios, '(I5.0)')   kios
[4147]1998      IF( kios < 0 ) THEN         
[5836]1999         CALL ctl_warn( 'end of record or file while reading namelist '   &
2000            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]2001      ENDIF
[5836]2002      !
[4147]2003      IF( kios > 0 ) THEN
[5836]2004         CALL ctl_stop( 'misspelled variable in namelist '   &
2005            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]2006      ENDIF
2007      kios = 0
2008      RETURN
[5836]2009      !
[4147]2010   END SUBROUTINE ctl_nam
2011
[5836]2012
[2715]2013   INTEGER FUNCTION get_unit()
2014      !!----------------------------------------------------------------------
2015      !!                  ***  FUNCTION  get_unit  ***
2016      !!
2017      !! ** Purpose :   return the index of an unused logical unit
2018      !!----------------------------------------------------------------------
[3764]2019      LOGICAL :: llopn
[2715]2020      !!----------------------------------------------------------------------
2021      !
2022      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2023      llopn = .TRUE.
2024      DO WHILE( (get_unit < 998) .AND. llopn )
2025         get_unit = get_unit + 1
2026         INQUIRE( unit = get_unit, opened = llopn )
2027      END DO
2028      IF( (get_unit == 999) .AND. llopn ) THEN
2029         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2030         get_unit = -1
2031      ENDIF
2032      !
2033   END FUNCTION get_unit
2034
2035   !!----------------------------------------------------------------------
[3]2036END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.