New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2019/fix_ticket2238_solution2/src/OCE/LBC – NEMO

source: NEMO/branches/2019/fix_ticket2238_solution2/src/OCE/LBC/lib_mpp.F90 @ 10679

Last change on this file since 10679 was 10679, checked in by mathiot, 5 years ago

branch for solution 2 of ticket #2238

  • Property svn:keywords set to Id
File size: 77.9 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpprecv       :
44   !!   mppsend       :
45   !!   mppscatter    :
46   !!   mppgather     :
47   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
48   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
49   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
50   !!   mpp_minloc    :
51   !!   mpp_maxloc    :
52   !!   mppsync       :
53   !!   mppstop       :
54   !!   mpp_ini_north : initialisation of north fold
55   !!----------------------------------------------------------------------
56   USE dom_oce        ! ocean space and time domain
57   USE lbcnfd         ! north fold treatment
58   USE in_out_manager ! I/O manager
59
60   IMPLICIT NONE
61   PRIVATE
62
63   INTERFACE mpp_nfd
64      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
65      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
66   END INTERFACE
67
68   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
69   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
70   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
71   !
72!!gm  this should be useless
73   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
74   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
75!!gm end
76   !
77   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
78   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
79   PUBLIC   mpp_ini_north
80   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
81   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
82   PUBLIC   mppscatter, mppgather
83   PUBLIC   mpp_ini_znl
84   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
85   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
86   
87   !! * Interfaces
88   !! define generic interface for these routine as they are called sometimes
89   !! with scalar arguments instead of array arguments, which causes problems
90   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
91   INTERFACE mpp_min
92      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
93   END INTERFACE
94   INTERFACE mpp_max
95      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
96   END INTERFACE
97   INTERFACE mpp_sum
98      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
99         &             mppsum_realdd, mppsum_a_realdd
100   END INTERFACE
101   INTERFACE mpp_minloc
102      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
103   END INTERFACE
104   INTERFACE mpp_maxloc
105      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
106   END INTERFACE
107
108   !! ========================= !!
109   !!  MPI  variable definition !!
110   !! ========================= !!
111!$AGRIF_DO_NOT_TREAT
112   INCLUDE 'mpif.h'
113!$AGRIF_END_DO_NOT_TREAT
114
115   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
116
117   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
118
119   INTEGER, PUBLIC ::   mppsize        ! number of process
120   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
121!$AGRIF_DO_NOT_TREAT
122   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
123!$AGRIF_END_DO_NOT_TREAT
124
125   INTEGER :: MPI_SUMDD
126
127   ! variables used for zonal integration
128   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
129   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
130   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
131   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
132   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
133
134   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
135   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
136   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
137   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
138   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
139   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
140   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
141   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
142   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
143
144   ! Type of send : standard, buffered, immediate
145   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
146   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
147   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
148
149   ! Communications summary report
150   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
151   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
152   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
153   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
154   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
155   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
156   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
157   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
158   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 3000          !: max number of communication record
159   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
160   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
161   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
162   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
163   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
164   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
165   !: name (used as id) of allreduce-delayed operations
166   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
167   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
168   !: component name where the allreduce-delayed operation is performed
169   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
170   TYPE, PUBLIC ::   DELAYARR
171      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
172      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
173   END TYPE DELAYARR
174   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC  ::   todelay             
175   INTEGER,          DIMENSION(nbdelay), PUBLIC  ::   ndelayid = -1     !: mpi request id of the delayed operations
176
177   ! timing summary report
178   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
179   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
180   
181   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
182
183   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
184   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
185
186   !!----------------------------------------------------------------------
187   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
188   !! $Id$
189   !! Software governed by the CeCILL license (see ./LICENSE)
190   !!----------------------------------------------------------------------
191CONTAINS
192
193   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
194      !!----------------------------------------------------------------------
195      !!                  ***  routine mynode  ***
196      !!
197      !! ** Purpose :   Find processor unit
198      !!----------------------------------------------------------------------
199      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
200      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
201      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
202      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
203      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
204      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
205      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
206      !
207      INTEGER ::   mynode, ierr, code, ji, ii, ios
208      LOGICAL ::   mpi_was_called
209      !
210      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
211      !!----------------------------------------------------------------------
212      !
213      ii = 1
214      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
215      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
216      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
217      !
218      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
219      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
220901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
221      !
222      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
223      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
224902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
225      !
226      !                              ! control print
227      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
228      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
229      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
230      !
231      IF( jpni < 1 .OR. jpnj < 1  ) THEN
232         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
233      ELSE
234         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
235         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
236      ENDIF
237
238      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
239
240      CALL mpi_initialized ( mpi_was_called, code )
241      IF( code /= MPI_SUCCESS ) THEN
242         DO ji = 1, SIZE(ldtxt)
243            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
244         END DO
245         WRITE(*, cform_err)
246         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
247         CALL mpi_abort( mpi_comm_world, code, ierr )
248      ENDIF
249
250      IF( mpi_was_called ) THEN
251         !
252         SELECT CASE ( cn_mpi_send )
253         CASE ( 'S' )                ! Standard mpi send (blocking)
254            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
255         CASE ( 'B' )                ! Buffer mpi send (blocking)
256            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
257            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
258         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
259            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
260            l_isend = .TRUE.
261         CASE DEFAULT
262            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
263            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
264            kstop = kstop + 1
265         END SELECT
266         !
267      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
268         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
269         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
270         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
271         kstop = kstop + 1
272      ELSE
273         SELECT CASE ( cn_mpi_send )
274         CASE ( 'S' )                ! Standard mpi send (blocking)
275            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
276            CALL mpi_init( ierr )
277         CASE ( 'B' )                ! Buffer mpi send (blocking)
278            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
279            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
280         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
281            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
282            l_isend = .TRUE.
283            CALL mpi_init( ierr )
284         CASE DEFAULT
285            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
286            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
287            kstop = kstop + 1
288         END SELECT
289         !
290      ENDIF
291
292      IF( PRESENT(localComm) ) THEN
293         IF( Agrif_Root() ) THEN
294            mpi_comm_oce = localComm
295         ENDIF
296      ELSE
297         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
298         IF( code /= MPI_SUCCESS ) THEN
299            DO ji = 1, SIZE(ldtxt)
300               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
301            END DO
302            WRITE(*, cform_err)
303            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
304            CALL mpi_abort( mpi_comm_world, code, ierr )
305         ENDIF
306      ENDIF
307
308#if defined key_agrif
309      IF( Agrif_Root() ) THEN
310         CALL Agrif_MPI_Init(mpi_comm_oce)
311      ELSE
312         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
313      ENDIF
314#endif
315
316      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
317      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
318      mynode = mpprank
319
320      IF( mynode == 0 ) THEN
321         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
322         WRITE(kumond, nammpp)     
323      ENDIF
324      !
325      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
326      !
327   END FUNCTION mynode
328
329   !!----------------------------------------------------------------------
330   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
331   !!
332   !!   * Argument : dummy argument use in mpp_lnk_... routines
333   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
334   !!                cd_nat :   nature of array grid-points
335   !!                psgn   :   sign used across the north fold boundary
336   !!                kfld   :   optional, number of pt3d arrays
337   !!                cd_mpp :   optional, fill the overlap area only
338   !!                pval   :   optional, background value (used at closed boundaries)
339   !!----------------------------------------------------------------------
340   !
341   !                       !==  2D array and array of 2D pointer  ==!
342   !
343#  define DIM_2d
344#     define ROUTINE_LNK           mpp_lnk_2d
345#     include "mpp_lnk_generic.h90"
346#     undef ROUTINE_LNK
347#     define MULTI
348#     define ROUTINE_LNK           mpp_lnk_2d_ptr
349#     include "mpp_lnk_generic.h90"
350#     undef ROUTINE_LNK
351#     undef MULTI
352#  undef DIM_2d
353   !
354   !                       !==  3D array and array of 3D pointer  ==!
355   !
356#  define DIM_3d
357#     define ROUTINE_LNK           mpp_lnk_3d
358#     include "mpp_lnk_generic.h90"
359#     undef ROUTINE_LNK
360#     define MULTI
361#     define ROUTINE_LNK           mpp_lnk_3d_ptr
362#     include "mpp_lnk_generic.h90"
363#     undef ROUTINE_LNK
364#     undef MULTI
365#  undef DIM_3d
366   !
367   !                       !==  4D array and array of 4D pointer  ==!
368   !
369#  define DIM_4d
370#     define ROUTINE_LNK           mpp_lnk_4d
371#     include "mpp_lnk_generic.h90"
372#     undef ROUTINE_LNK
373#     define MULTI
374#     define ROUTINE_LNK           mpp_lnk_4d_ptr
375#     include "mpp_lnk_generic.h90"
376#     undef ROUTINE_LNK
377#     undef MULTI
378#  undef DIM_4d
379
380   !!----------------------------------------------------------------------
381   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
382   !!
383   !!   * Argument : dummy argument use in mpp_nfd_... routines
384   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
385   !!                cd_nat :   nature of array grid-points
386   !!                psgn   :   sign used across the north fold boundary
387   !!                kfld   :   optional, number of pt3d arrays
388   !!                cd_mpp :   optional, fill the overlap area only
389   !!                pval   :   optional, background value (used at closed boundaries)
390   !!----------------------------------------------------------------------
391   !
392   !                       !==  2D array and array of 2D pointer  ==!
393   !
394#  define DIM_2d
395#     define ROUTINE_NFD           mpp_nfd_2d
396#     include "mpp_nfd_generic.h90"
397#     undef ROUTINE_NFD
398#     define MULTI
399#     define ROUTINE_NFD           mpp_nfd_2d_ptr
400#     include "mpp_nfd_generic.h90"
401#     undef ROUTINE_NFD
402#     undef MULTI
403#  undef DIM_2d
404   !
405   !                       !==  3D array and array of 3D pointer  ==!
406   !
407#  define DIM_3d
408#     define ROUTINE_NFD           mpp_nfd_3d
409#     include "mpp_nfd_generic.h90"
410#     undef ROUTINE_NFD
411#     define MULTI
412#     define ROUTINE_NFD           mpp_nfd_3d_ptr
413#     include "mpp_nfd_generic.h90"
414#     undef ROUTINE_NFD
415#     undef MULTI
416#  undef DIM_3d
417   !
418   !                       !==  4D array and array of 4D pointer  ==!
419   !
420#  define DIM_4d
421#     define ROUTINE_NFD           mpp_nfd_4d
422#     include "mpp_nfd_generic.h90"
423#     undef ROUTINE_NFD
424#     define MULTI
425#     define ROUTINE_NFD           mpp_nfd_4d_ptr
426#     include "mpp_nfd_generic.h90"
427#     undef ROUTINE_NFD
428#     undef MULTI
429#  undef DIM_4d
430
431
432   !!----------------------------------------------------------------------
433   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
434   !!
435   !!   * Argument : dummy argument use in mpp_lnk_... routines
436   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
437   !!                cd_nat :   nature of array grid-points
438   !!                psgn   :   sign used across the north fold boundary
439   !!                kb_bdy :   BDY boundary set
440   !!                kfld   :   optional, number of pt3d arrays
441   !!----------------------------------------------------------------------
442   !
443   !                       !==  2D array and array of 2D pointer  ==!
444   !
445#  define DIM_2d
446#     define ROUTINE_BDY           mpp_lnk_bdy_2d
447#     include "mpp_bdy_generic.h90"
448#     undef ROUTINE_BDY
449#  undef DIM_2d
450   !
451   !                       !==  3D array and array of 3D pointer  ==!
452   !
453#  define DIM_3d
454#     define ROUTINE_BDY           mpp_lnk_bdy_3d
455#     include "mpp_bdy_generic.h90"
456#     undef ROUTINE_BDY
457#  undef DIM_3d
458   !
459   !                       !==  4D array and array of 4D pointer  ==!
460   !
461#  define DIM_4d
462#     define ROUTINE_BDY           mpp_lnk_bdy_4d
463#     include "mpp_bdy_generic.h90"
464#     undef ROUTINE_BDY
465#  undef DIM_4d
466
467   !!----------------------------------------------------------------------
468   !!
469   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
470   
471   
472   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
473   
474   
475   !!----------------------------------------------------------------------
476
477
478
479   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
480      !!----------------------------------------------------------------------
481      !!                  ***  routine mppsend  ***
482      !!
483      !! ** Purpose :   Send messag passing array
484      !!
485      !!----------------------------------------------------------------------
486      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
487      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
488      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
489      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
490      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
491      !!
492      INTEGER ::   iflag
493      !!----------------------------------------------------------------------
494      !
495      SELECT CASE ( cn_mpi_send )
496      CASE ( 'S' )                ! Standard mpi send (blocking)
497         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
498      CASE ( 'B' )                ! Buffer mpi send (blocking)
499         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
500      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
501         ! be carefull, one more argument here : the mpi request identifier..
502         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
503      END SELECT
504      !
505   END SUBROUTINE mppsend
506
507
508   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
509      !!----------------------------------------------------------------------
510      !!                  ***  routine mpprecv  ***
511      !!
512      !! ** Purpose :   Receive messag passing array
513      !!
514      !!----------------------------------------------------------------------
515      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
516      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
517      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
518      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
519      !!
520      INTEGER :: istatus(mpi_status_size)
521      INTEGER :: iflag
522      INTEGER :: use_source
523      !!----------------------------------------------------------------------
524      !
525      ! If a specific process number has been passed to the receive call,
526      ! use that one. Default is to use mpi_any_source
527      use_source = mpi_any_source
528      IF( PRESENT(ksource) )   use_source = ksource
529      !
530      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
531      !
532   END SUBROUTINE mpprecv
533
534
535   SUBROUTINE mppgather( ptab, kp, pio )
536      !!----------------------------------------------------------------------
537      !!                   ***  routine mppgather  ***
538      !!
539      !! ** Purpose :   Transfert between a local subdomain array and a work
540      !!     array which is distributed following the vertical level.
541      !!
542      !!----------------------------------------------------------------------
543      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
544      INTEGER                           , INTENT(in   ) ::   kp     ! record length
545      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
546      !!
547      INTEGER :: itaille, ierror   ! temporary integer
548      !!---------------------------------------------------------------------
549      !
550      itaille = jpi * jpj
551      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
552         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
553      !
554   END SUBROUTINE mppgather
555
556
557   SUBROUTINE mppscatter( pio, kp, ptab )
558      !!----------------------------------------------------------------------
559      !!                  ***  routine mppscatter  ***
560      !!
561      !! ** Purpose :   Transfert between awork array which is distributed
562      !!      following the vertical level and the local subdomain array.
563      !!
564      !!----------------------------------------------------------------------
565      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
566      INTEGER                             ::   kp     ! Tag (not used with MPI
567      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
568      !!
569      INTEGER :: itaille, ierror   ! temporary integer
570      !!---------------------------------------------------------------------
571      !
572      itaille = jpi * jpj
573      !
574      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
575         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
576      !
577   END SUBROUTINE mppscatter
578
579   
580   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
581     !!----------------------------------------------------------------------
582      !!                   ***  routine mpp_delay_sum  ***
583      !!
584      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
585      !!
586      !!----------------------------------------------------------------------
587      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
588      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
589      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
590      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
591      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
592      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
593      !!
594      INTEGER ::   ji, isz
595      INTEGER ::   idvar
596      INTEGER ::   ierr, ilocalcomm
597      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
598      !!----------------------------------------------------------------------
599      ilocalcomm = mpi_comm_oce
600      IF( PRESENT(kcom) )   ilocalcomm = kcom
601
602      isz = SIZE(y_in)
603     
604      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
605
606      idvar = -1
607      DO ji = 1, nbdelay
608         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
609      END DO
610      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
611
612      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
613         !                                       --------------------------
614         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
615            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
616            DEALLOCATE(todelay(idvar)%z1d)
617            ndelayid(idvar) = -1                                      ! do as if we had no restart
618         ELSE
619            ALLOCATE(todelay(idvar)%y1d(isz))
620            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
621         END IF
622      ENDIF
623     
624      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
625         !                                       --------------------------
626         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
627         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
628         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
629      ENDIF
630
631      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
632
633      ! send back pout from todelay(idvar)%z1d defined at previous call
634      pout(:) = todelay(idvar)%z1d(:)
635
636      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
637#if defined key_mpi2
638      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
639      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
640      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
641#else
642      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
643#endif
644
645   END SUBROUTINE mpp_delay_sum
646
647   
648   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
649      !!----------------------------------------------------------------------
650      !!                   ***  routine mpp_delay_max  ***
651      !!
652      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
653      !!
654      !!----------------------------------------------------------------------
655      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
656      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
657      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
658      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
659      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
660      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
661      !!
662      INTEGER ::   ji, isz
663      INTEGER ::   idvar
664      INTEGER ::   ierr, ilocalcomm
665      !!----------------------------------------------------------------------
666      ilocalcomm = mpi_comm_oce
667      IF( PRESENT(kcom) )   ilocalcomm = kcom
668
669      isz = SIZE(p_in)
670
671      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
672
673      idvar = -1
674      DO ji = 1, nbdelay
675         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
676      END DO
677      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
678
679      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
680         !                                       --------------------------
681         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
682            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
683            DEALLOCATE(todelay(idvar)%z1d)
684            ndelayid(idvar) = -1                                      ! do as if we had no restart
685         END IF
686      ENDIF
687
688      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
689         !                                       --------------------------
690         ALLOCATE(todelay(idvar)%z1d(isz))
691         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
692      ENDIF
693
694      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
695
696      ! send back pout from todelay(idvar)%z1d defined at previous call
697      pout(:) = todelay(idvar)%z1d(:)
698
699      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
700#if defined key_mpi2
701      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
702      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
703      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
704#else
705      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
706#endif
707
708   END SUBROUTINE mpp_delay_max
709
710   
711   SUBROUTINE mpp_delay_rcv( kid )
712      !!----------------------------------------------------------------------
713      !!                   ***  routine mpp_delay_rcv  ***
714      !!
715      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
716      !!
717      !!----------------------------------------------------------------------
718      INTEGER,INTENT(in   )      ::  kid 
719      INTEGER ::   ierr
720      !!----------------------------------------------------------------------
721      IF( ndelayid(kid) /= -2 ) THEN 
722#if ! defined key_mpi2
723         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
724         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
725         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
726#endif
727         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
728         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
729      ENDIF
730   END SUBROUTINE mpp_delay_rcv
731
732   
733   !!----------------------------------------------------------------------
734   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
735   !!   
736   !!----------------------------------------------------------------------
737   !!
738#  define OPERATION_MAX
739#  define INTEGER_TYPE
740#  define DIM_0d
741#     define ROUTINE_ALLREDUCE           mppmax_int
742#     include "mpp_allreduce_generic.h90"
743#     undef ROUTINE_ALLREDUCE
744#  undef DIM_0d
745#  define DIM_1d
746#     define ROUTINE_ALLREDUCE           mppmax_a_int
747#     include "mpp_allreduce_generic.h90"
748#     undef ROUTINE_ALLREDUCE
749#  undef DIM_1d
750#  undef INTEGER_TYPE
751!
752#  define REAL_TYPE
753#  define DIM_0d
754#     define ROUTINE_ALLREDUCE           mppmax_real
755#     include "mpp_allreduce_generic.h90"
756#     undef ROUTINE_ALLREDUCE
757#  undef DIM_0d
758#  define DIM_1d
759#     define ROUTINE_ALLREDUCE           mppmax_a_real
760#     include "mpp_allreduce_generic.h90"
761#     undef ROUTINE_ALLREDUCE
762#  undef DIM_1d
763#  undef REAL_TYPE
764#  undef OPERATION_MAX
765   !!----------------------------------------------------------------------
766   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
767   !!   
768   !!----------------------------------------------------------------------
769   !!
770#  define OPERATION_MIN
771#  define INTEGER_TYPE
772#  define DIM_0d
773#     define ROUTINE_ALLREDUCE           mppmin_int
774#     include "mpp_allreduce_generic.h90"
775#     undef ROUTINE_ALLREDUCE
776#  undef DIM_0d
777#  define DIM_1d
778#     define ROUTINE_ALLREDUCE           mppmin_a_int
779#     include "mpp_allreduce_generic.h90"
780#     undef ROUTINE_ALLREDUCE
781#  undef DIM_1d
782#  undef INTEGER_TYPE
783!
784#  define REAL_TYPE
785#  define DIM_0d
786#     define ROUTINE_ALLREDUCE           mppmin_real
787#     include "mpp_allreduce_generic.h90"
788#     undef ROUTINE_ALLREDUCE
789#  undef DIM_0d
790#  define DIM_1d
791#     define ROUTINE_ALLREDUCE           mppmin_a_real
792#     include "mpp_allreduce_generic.h90"
793#     undef ROUTINE_ALLREDUCE
794#  undef DIM_1d
795#  undef REAL_TYPE
796#  undef OPERATION_MIN
797
798   !!----------------------------------------------------------------------
799   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
800   !!   
801   !!   Global sum of 1D array or a variable (integer, real or complex)
802   !!----------------------------------------------------------------------
803   !!
804#  define OPERATION_SUM
805#  define INTEGER_TYPE
806#  define DIM_0d
807#     define ROUTINE_ALLREDUCE           mppsum_int
808#     include "mpp_allreduce_generic.h90"
809#     undef ROUTINE_ALLREDUCE
810#  undef DIM_0d
811#  define DIM_1d
812#     define ROUTINE_ALLREDUCE           mppsum_a_int
813#     include "mpp_allreduce_generic.h90"
814#     undef ROUTINE_ALLREDUCE
815#  undef DIM_1d
816#  undef INTEGER_TYPE
817!
818#  define REAL_TYPE
819#  define DIM_0d
820#     define ROUTINE_ALLREDUCE           mppsum_real
821#     include "mpp_allreduce_generic.h90"
822#     undef ROUTINE_ALLREDUCE
823#  undef DIM_0d
824#  define DIM_1d
825#     define ROUTINE_ALLREDUCE           mppsum_a_real
826#     include "mpp_allreduce_generic.h90"
827#     undef ROUTINE_ALLREDUCE
828#  undef DIM_1d
829#  undef REAL_TYPE
830#  undef OPERATION_SUM
831
832#  define OPERATION_SUM_DD
833#  define COMPLEX_TYPE
834#  define DIM_0d
835#     define ROUTINE_ALLREDUCE           mppsum_realdd
836#     include "mpp_allreduce_generic.h90"
837#     undef ROUTINE_ALLREDUCE
838#  undef DIM_0d
839#  define DIM_1d
840#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
841#     include "mpp_allreduce_generic.h90"
842#     undef ROUTINE_ALLREDUCE
843#  undef DIM_1d
844#  undef COMPLEX_TYPE
845#  undef OPERATION_SUM_DD
846
847   !!----------------------------------------------------------------------
848   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
849   !!   
850   !!----------------------------------------------------------------------
851   !!
852#  define OPERATION_MINLOC
853#  define DIM_2d
854#     define ROUTINE_LOC           mpp_minloc2d
855#     include "mpp_loc_generic.h90"
856#     undef ROUTINE_LOC
857#  undef DIM_2d
858#  define DIM_3d
859#     define ROUTINE_LOC           mpp_minloc3d
860#     include "mpp_loc_generic.h90"
861#     undef ROUTINE_LOC
862#  undef DIM_3d
863#  undef OPERATION_MINLOC
864
865#  define OPERATION_MAXLOC
866#  define DIM_2d
867#     define ROUTINE_LOC           mpp_maxloc2d
868#     include "mpp_loc_generic.h90"
869#     undef ROUTINE_LOC
870#  undef DIM_2d
871#  define DIM_3d
872#     define ROUTINE_LOC           mpp_maxloc3d
873#     include "mpp_loc_generic.h90"
874#     undef ROUTINE_LOC
875#  undef DIM_3d
876#  undef OPERATION_MAXLOC
877
878   SUBROUTINE mppsync()
879      !!----------------------------------------------------------------------
880      !!                  ***  routine mppsync  ***
881      !!
882      !! ** Purpose :   Massively parallel processors, synchroneous
883      !!
884      !!-----------------------------------------------------------------------
885      INTEGER :: ierror
886      !!-----------------------------------------------------------------------
887      !
888      CALL mpi_barrier( mpi_comm_oce, ierror )
889      !
890   END SUBROUTINE mppsync
891
892
893   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
894      !!----------------------------------------------------------------------
895      !!                  ***  routine mppstop  ***
896      !!
897      !! ** purpose :   Stop massively parallel processors method
898      !!
899      !!----------------------------------------------------------------------
900      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
901      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
902      LOGICAL ::   llfinal, ll_force_abort
903      INTEGER ::   info
904      !!----------------------------------------------------------------------
905      llfinal = .FALSE.
906      IF( PRESENT(ldfinal) ) llfinal = ldfinal
907      ll_force_abort = .FALSE.
908      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
909      !
910      IF(ll_force_abort) THEN
911         CALL mpi_abort( MPI_COMM_WORLD )
912      ELSE
913         CALL mppsync
914         CALL mpi_finalize( info )
915      ENDIF
916      IF( .NOT. llfinal ) STOP 123456
917      !
918   END SUBROUTINE mppstop
919
920
921   SUBROUTINE mpp_comm_free( kcom )
922      !!----------------------------------------------------------------------
923      INTEGER, INTENT(in) ::   kcom
924      !!
925      INTEGER :: ierr
926      !!----------------------------------------------------------------------
927      !
928      CALL MPI_COMM_FREE(kcom, ierr)
929      !
930   END SUBROUTINE mpp_comm_free
931
932
933   SUBROUTINE mpp_ini_znl( kumout )
934      !!----------------------------------------------------------------------
935      !!               ***  routine mpp_ini_znl  ***
936      !!
937      !! ** Purpose :   Initialize special communicator for computing zonal sum
938      !!
939      !! ** Method  : - Look for processors in the same row
940      !!              - Put their number in nrank_znl
941      !!              - Create group for the znl processors
942      !!              - Create a communicator for znl processors
943      !!              - Determine if processor should write znl files
944      !!
945      !! ** output
946      !!      ndim_rank_znl = number of processors on the same row
947      !!      ngrp_znl = group ID for the znl processors
948      !!      ncomm_znl = communicator for the ice procs.
949      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
950      !!
951      !!----------------------------------------------------------------------
952      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
953      !
954      INTEGER :: jproc      ! dummy loop integer
955      INTEGER :: ierr, ii   ! local integer
956      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
957      !!----------------------------------------------------------------------
958      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
959      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
960      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
961      !
962      ALLOCATE( kwork(jpnij), STAT=ierr )
963      IF( ierr /= 0 ) THEN
964         WRITE(kumout, cform_err)
965         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
966         CALL mppstop
967      ENDIF
968
969      IF( jpnj == 1 ) THEN
970         ngrp_znl  = ngrp_world
971         ncomm_znl = mpi_comm_oce
972      ELSE
973         !
974         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
975         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
976         !-$$        CALL flush(numout)
977         !
978         ! Count number of processors on the same row
979         ndim_rank_znl = 0
980         DO jproc=1,jpnij
981            IF ( kwork(jproc) == njmpp ) THEN
982               ndim_rank_znl = ndim_rank_znl + 1
983            ENDIF
984         END DO
985         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
986         !-$$        CALL flush(numout)
987         ! Allocate the right size to nrank_znl
988         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
989         ALLOCATE(nrank_znl(ndim_rank_znl))
990         ii = 0
991         nrank_znl (:) = 0
992         DO jproc=1,jpnij
993            IF ( kwork(jproc) == njmpp) THEN
994               ii = ii + 1
995               nrank_znl(ii) = jproc -1
996            ENDIF
997         END DO
998         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
999         !-$$        CALL flush(numout)
1000
1001         ! Create the opa group
1002         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1003         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1004         !-$$        CALL flush(numout)
1005
1006         ! Create the znl group from the opa group
1007         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1008         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1009         !-$$        CALL flush(numout)
1010
1011         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1012         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1013         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1014         !-$$        CALL flush(numout)
1015         !
1016      END IF
1017
1018      ! Determines if processor if the first (starting from i=1) on the row
1019      IF ( jpni == 1 ) THEN
1020         l_znl_root = .TRUE.
1021      ELSE
1022         l_znl_root = .FALSE.
1023         kwork (1) = nimpp
1024         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1025         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1026      END IF
1027
1028      DEALLOCATE(kwork)
1029
1030   END SUBROUTINE mpp_ini_znl
1031
1032
1033   SUBROUTINE mpp_ini_north
1034      !!----------------------------------------------------------------------
1035      !!               ***  routine mpp_ini_north  ***
1036      !!
1037      !! ** Purpose :   Initialize special communicator for north folding
1038      !!      condition together with global variables needed in the mpp folding
1039      !!
1040      !! ** Method  : - Look for northern processors
1041      !!              - Put their number in nrank_north
1042      !!              - Create groups for the world processors and the north processors
1043      !!              - Create a communicator for northern processors
1044      !!
1045      !! ** output
1046      !!      njmppmax = njmpp for northern procs
1047      !!      ndim_rank_north = number of processors in the northern line
1048      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1049      !!      ngrp_world = group ID for the world processors
1050      !!      ngrp_north = group ID for the northern processors
1051      !!      ncomm_north = communicator for the northern procs.
1052      !!      north_root = number (in the world) of proc 0 in the northern comm.
1053      !!
1054      !!----------------------------------------------------------------------
1055      INTEGER ::   ierr
1056      INTEGER ::   jjproc
1057      INTEGER ::   ii, ji
1058      !!----------------------------------------------------------------------
1059      !
1060      njmppmax = MAXVAL( njmppt )
1061      !
1062      ! Look for how many procs on the northern boundary
1063      ndim_rank_north = 0
1064      DO jjproc = 1, jpnij
1065         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1066      END DO
1067      !
1068      ! Allocate the right size to nrank_north
1069      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1070      ALLOCATE( nrank_north(ndim_rank_north) )
1071
1072      ! Fill the nrank_north array with proc. number of northern procs.
1073      ! Note : the rank start at 0 in MPI
1074      ii = 0
1075      DO ji = 1, jpnij
1076         IF ( njmppt(ji) == njmppmax   ) THEN
1077            ii=ii+1
1078            nrank_north(ii)=ji-1
1079         END IF
1080      END DO
1081      !
1082      ! create the world group
1083      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1084      !
1085      ! Create the North group from the world group
1086      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1087      !
1088      ! Create the North communicator , ie the pool of procs in the north group
1089      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1090      !
1091   END SUBROUTINE mpp_ini_north
1092
1093
1094   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1095      !!---------------------------------------------------------------------
1096      !!                   ***  routine mpp_init.opa  ***
1097      !!
1098      !! ** Purpose :: export and attach a MPI buffer for bsend
1099      !!
1100      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1101      !!            but classical mpi_init
1102      !!
1103      !! History :: 01/11 :: IDRIS initial version for IBM only
1104      !!            08/04 :: R. Benshila, generalisation
1105      !!---------------------------------------------------------------------
1106      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1107      INTEGER                      , INTENT(inout) ::   ksft
1108      INTEGER                      , INTENT(  out) ::   code
1109      INTEGER                                      ::   ierr, ji
1110      LOGICAL                                      ::   mpi_was_called
1111      !!---------------------------------------------------------------------
1112      !
1113      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1114      IF ( code /= MPI_SUCCESS ) THEN
1115         DO ji = 1, SIZE(ldtxt)
1116            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1117         END DO
1118         WRITE(*, cform_err)
1119         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1120         CALL mpi_abort( mpi_comm_world, code, ierr )
1121      ENDIF
1122      !
1123      IF( .NOT. mpi_was_called ) THEN
1124         CALL mpi_init( code )
1125         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1126         IF ( code /= MPI_SUCCESS ) THEN
1127            DO ji = 1, SIZE(ldtxt)
1128               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1129            END DO
1130            WRITE(*, cform_err)
1131            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1132            CALL mpi_abort( mpi_comm_world, code, ierr )
1133         ENDIF
1134      ENDIF
1135      !
1136      IF( nn_buffer > 0 ) THEN
1137         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1138         ! Buffer allocation and attachment
1139         ALLOCATE( tampon(nn_buffer), stat = ierr )
1140         IF( ierr /= 0 ) THEN
1141            DO ji = 1, SIZE(ldtxt)
1142               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1143            END DO
1144            WRITE(*, cform_err)
1145            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1146            CALL mpi_abort( mpi_comm_world, code, ierr )
1147         END IF
1148         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1149      ENDIF
1150      !
1151   END SUBROUTINE mpi_init_oce
1152
1153
1154   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1155      !!---------------------------------------------------------------------
1156      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1157      !!
1158      !!   Modification of original codes written by David H. Bailey
1159      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1160      !!---------------------------------------------------------------------
1161      INTEGER                     , INTENT(in)    ::   ilen, itype
1162      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1163      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1164      !
1165      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1166      INTEGER  :: ji, ztmp           ! local scalar
1167      !!---------------------------------------------------------------------
1168      !
1169      ztmp = itype   ! avoid compilation warning
1170      !
1171      DO ji=1,ilen
1172      ! Compute ydda + yddb using Knuth's trick.
1173         zt1  = real(ydda(ji)) + real(yddb(ji))
1174         zerr = zt1 - real(ydda(ji))
1175         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1176                + aimag(ydda(ji)) + aimag(yddb(ji))
1177
1178         ! The result is zt1 + zt2, after normalization.
1179         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1180      END DO
1181      !
1182   END SUBROUTINE DDPDD_MPI
1183
1184   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1185      !!----------------------------------------------------------------------
1186      !!                  ***  routine mpp_report  ***
1187      !!
1188      !! ** Purpose :   report use of mpp routines per time-setp
1189      !!
1190      !!----------------------------------------------------------------------
1191      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1192      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1193      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1194      !!
1195      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1196      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1197      !!----------------------------------------------------------------------
1198      !
1199      ll_lbc = .FALSE.
1200      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1201      ll_glb = .FALSE.
1202      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1203      ll_dlg = .FALSE.
1204      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1205      !
1206      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1207      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1208      ncom_freq = ncom_fsbc
1209      !
1210      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1211         IF( ll_lbc ) THEN
1212            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1213            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1214            n_sequence_lbc = n_sequence_lbc + 1
1215            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1216            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1217            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1218            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1219         ENDIF
1220         IF( ll_glb ) THEN
1221            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1222            n_sequence_glb = n_sequence_glb + 1
1223            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1224            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1225         ENDIF
1226         IF( ll_dlg ) THEN
1227            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1228            n_sequence_dlg = n_sequence_dlg + 1
1229            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1230            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1231         ENDIF
1232      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1233         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1234         WRITE(numcom,*) ' '
1235         WRITE(numcom,*) ' ------------------------------------------------------------'
1236         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1237         WRITE(numcom,*) ' ------------------------------------------------------------'
1238         WRITE(numcom,*) ' '
1239         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1240         jj = 0; jk = 0; jf = 0; jh = 0
1241         DO ji = 1, n_sequence_lbc
1242            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1243            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1244            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1245            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1246         END DO
1247         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1248         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1249         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1250         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1251         WRITE(numcom,*) ' '
1252         WRITE(numcom,*) ' lbc_lnk called'
1253         jj = 1
1254         DO ji = 2, n_sequence_lbc
1255            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1256               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1257               jj = 0
1258            END IF
1259            jj = jj + 1 
1260         END DO
1261         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1262         WRITE(numcom,*) ' '
1263         IF ( n_sequence_glb > 0 ) THEN
1264            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1265            jj = 1
1266            DO ji = 2, n_sequence_glb
1267               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1268                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1269                  jj = 0
1270               END IF
1271               jj = jj + 1 
1272            END DO
1273            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1274            DEALLOCATE(crname_glb)
1275         ELSE
1276            WRITE(numcom,*) ' No MPI global communication '
1277         ENDIF
1278         WRITE(numcom,*) ' '
1279         IF ( n_sequence_dlg > 0 ) THEN
1280            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1281            jj = 1
1282            DO ji = 2, n_sequence_dlg
1283               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1284                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1285                  jj = 0
1286               END IF
1287               jj = jj + 1 
1288            END DO
1289            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1290            DEALLOCATE(crname_dlg)
1291         ELSE
1292            WRITE(numcom,*) ' No MPI delayed global communication '
1293         ENDIF
1294         WRITE(numcom,*) ' '
1295         WRITE(numcom,*) ' -----------------------------------------------'
1296         WRITE(numcom,*) ' '
1297         DEALLOCATE(ncomm_sequence)
1298         DEALLOCATE(crname_lbc)
1299      ENDIF
1300   END SUBROUTINE mpp_report
1301
1302   
1303   SUBROUTINE tic_tac (ld_tic, ld_global)
1304
1305    LOGICAL,           INTENT(IN) :: ld_tic
1306    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1307    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1308    REAL(wp),               SAVE :: tic_ct = 0._wp
1309    INTEGER :: ii
1310
1311    IF( ncom_stp <= nit000 ) RETURN
1312    IF( ncom_stp == nitend ) RETURN
1313    ii = 1
1314    IF( PRESENT( ld_global ) ) THEN
1315       IF( ld_global ) ii = 2
1316    END IF
1317   
1318    IF ( ld_tic ) THEN
1319       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1320       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1321    ELSE
1322       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1323       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1324    ENDIF
1325   
1326   END SUBROUTINE tic_tac
1327
1328   
1329#else
1330   !!----------------------------------------------------------------------
1331   !!   Default case:            Dummy module        share memory computing
1332   !!----------------------------------------------------------------------
1333   USE in_out_manager
1334
1335   INTERFACE mpp_sum
1336      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1337   END INTERFACE
1338   INTERFACE mpp_max
1339      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1340   END INTERFACE
1341   INTERFACE mpp_min
1342      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1343   END INTERFACE
1344   INTERFACE mpp_minloc
1345      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1346   END INTERFACE
1347   INTERFACE mpp_maxloc
1348      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1349   END INTERFACE
1350
1351   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1352   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1353   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1354
1355   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1356   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1357   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1358   LOGICAL, PUBLIC                          ::   l_full_nf_update = .TRUE.
1359   TYPE ::   DELAYARR
1360      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1361      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1362   END TYPE DELAYARR
1363   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1364   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1365   !!----------------------------------------------------------------------
1366CONTAINS
1367
1368   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1369      INTEGER, INTENT(in) ::   kumout
1370      lib_mpp_alloc = 0
1371   END FUNCTION lib_mpp_alloc
1372
1373   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1374      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1375      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1376      CHARACTER(len=*) ::   ldname
1377      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1378      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1379      function_value = 0
1380      IF( .FALSE. )   ldtxt(:) = 'never done'
1381      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1382   END FUNCTION mynode
1383
1384   SUBROUTINE mppsync                       ! Dummy routine
1385   END SUBROUTINE mppsync
1386
1387   !!----------------------------------------------------------------------
1388   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1389   !!   
1390   !!----------------------------------------------------------------------
1391   !!
1392#  define OPERATION_MAX
1393#  define INTEGER_TYPE
1394#  define DIM_0d
1395#     define ROUTINE_ALLREDUCE           mppmax_int
1396#     include "mpp_allreduce_generic.h90"
1397#     undef ROUTINE_ALLREDUCE
1398#  undef DIM_0d
1399#  define DIM_1d
1400#     define ROUTINE_ALLREDUCE           mppmax_a_int
1401#     include "mpp_allreduce_generic.h90"
1402#     undef ROUTINE_ALLREDUCE
1403#  undef DIM_1d
1404#  undef INTEGER_TYPE
1405!
1406#  define REAL_TYPE
1407#  define DIM_0d
1408#     define ROUTINE_ALLREDUCE           mppmax_real
1409#     include "mpp_allreduce_generic.h90"
1410#     undef ROUTINE_ALLREDUCE
1411#  undef DIM_0d
1412#  define DIM_1d
1413#     define ROUTINE_ALLREDUCE           mppmax_a_real
1414#     include "mpp_allreduce_generic.h90"
1415#     undef ROUTINE_ALLREDUCE
1416#  undef DIM_1d
1417#  undef REAL_TYPE
1418#  undef OPERATION_MAX
1419   !!----------------------------------------------------------------------
1420   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1421   !!   
1422   !!----------------------------------------------------------------------
1423   !!
1424#  define OPERATION_MIN
1425#  define INTEGER_TYPE
1426#  define DIM_0d
1427#     define ROUTINE_ALLREDUCE           mppmin_int
1428#     include "mpp_allreduce_generic.h90"
1429#     undef ROUTINE_ALLREDUCE
1430#  undef DIM_0d
1431#  define DIM_1d
1432#     define ROUTINE_ALLREDUCE           mppmin_a_int
1433#     include "mpp_allreduce_generic.h90"
1434#     undef ROUTINE_ALLREDUCE
1435#  undef DIM_1d
1436#  undef INTEGER_TYPE
1437!
1438#  define REAL_TYPE
1439#  define DIM_0d
1440#     define ROUTINE_ALLREDUCE           mppmin_real
1441#     include "mpp_allreduce_generic.h90"
1442#     undef ROUTINE_ALLREDUCE
1443#  undef DIM_0d
1444#  define DIM_1d
1445#     define ROUTINE_ALLREDUCE           mppmin_a_real
1446#     include "mpp_allreduce_generic.h90"
1447#     undef ROUTINE_ALLREDUCE
1448#  undef DIM_1d
1449#  undef REAL_TYPE
1450#  undef OPERATION_MIN
1451
1452   !!----------------------------------------------------------------------
1453   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1454   !!   
1455   !!   Global sum of 1D array or a variable (integer, real or complex)
1456   !!----------------------------------------------------------------------
1457   !!
1458#  define OPERATION_SUM
1459#  define INTEGER_TYPE
1460#  define DIM_0d
1461#     define ROUTINE_ALLREDUCE           mppsum_int
1462#     include "mpp_allreduce_generic.h90"
1463#     undef ROUTINE_ALLREDUCE
1464#  undef DIM_0d
1465#  define DIM_1d
1466#     define ROUTINE_ALLREDUCE           mppsum_a_int
1467#     include "mpp_allreduce_generic.h90"
1468#     undef ROUTINE_ALLREDUCE
1469#  undef DIM_1d
1470#  undef INTEGER_TYPE
1471!
1472#  define REAL_TYPE
1473#  define DIM_0d
1474#     define ROUTINE_ALLREDUCE           mppsum_real
1475#     include "mpp_allreduce_generic.h90"
1476#     undef ROUTINE_ALLREDUCE
1477#  undef DIM_0d
1478#  define DIM_1d
1479#     define ROUTINE_ALLREDUCE           mppsum_a_real
1480#     include "mpp_allreduce_generic.h90"
1481#     undef ROUTINE_ALLREDUCE
1482#  undef DIM_1d
1483#  undef REAL_TYPE
1484#  undef OPERATION_SUM
1485
1486#  define OPERATION_SUM_DD
1487#  define COMPLEX_TYPE
1488#  define DIM_0d
1489#     define ROUTINE_ALLREDUCE           mppsum_realdd
1490#     include "mpp_allreduce_generic.h90"
1491#     undef ROUTINE_ALLREDUCE
1492#  undef DIM_0d
1493#  define DIM_1d
1494#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1495#     include "mpp_allreduce_generic.h90"
1496#     undef ROUTINE_ALLREDUCE
1497#  undef DIM_1d
1498#  undef COMPLEX_TYPE
1499#  undef OPERATION_SUM_DD
1500
1501   !!----------------------------------------------------------------------
1502   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1503   !!   
1504   !!----------------------------------------------------------------------
1505   !!
1506#  define OPERATION_MINLOC
1507#  define DIM_2d
1508#     define ROUTINE_LOC           mpp_minloc2d
1509#     include "mpp_loc_generic.h90"
1510#     undef ROUTINE_LOC
1511#  undef DIM_2d
1512#  define DIM_3d
1513#     define ROUTINE_LOC           mpp_minloc3d
1514#     include "mpp_loc_generic.h90"
1515#     undef ROUTINE_LOC
1516#  undef DIM_3d
1517#  undef OPERATION_MINLOC
1518
1519#  define OPERATION_MAXLOC
1520#  define DIM_2d
1521#     define ROUTINE_LOC           mpp_maxloc2d
1522#     include "mpp_loc_generic.h90"
1523#     undef ROUTINE_LOC
1524#  undef DIM_2d
1525#  define DIM_3d
1526#     define ROUTINE_LOC           mpp_maxloc3d
1527#     include "mpp_loc_generic.h90"
1528#     undef ROUTINE_LOC
1529#  undef DIM_3d
1530#  undef OPERATION_MAXLOC
1531
1532   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1533      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1534      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1535      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1536      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1537      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1538      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1539      !
1540      pout(:) = REAL(y_in(:), wp)
1541   END SUBROUTINE mpp_delay_sum
1542
1543   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1544      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1545      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1546      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1547      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1548      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1549      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1550      !
1551      pout(:) = p_in(:)
1552   END SUBROUTINE mpp_delay_max
1553
1554   SUBROUTINE mpp_delay_rcv( kid )
1555      INTEGER,INTENT(in   )      ::  kid 
1556      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1557   END SUBROUTINE mpp_delay_rcv
1558   
1559   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1560      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1561      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1562      STOP      ! non MPP case, just stop the run
1563   END SUBROUTINE mppstop
1564
1565   SUBROUTINE mpp_ini_znl( knum )
1566      INTEGER :: knum
1567      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1568   END SUBROUTINE mpp_ini_znl
1569
1570   SUBROUTINE mpp_comm_free( kcom )
1571      INTEGER :: kcom
1572      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1573   END SUBROUTINE mpp_comm_free
1574   
1575#endif
1576
1577   !!----------------------------------------------------------------------
1578   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1579   !!----------------------------------------------------------------------
1580
1581   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1582      &                 cd6, cd7, cd8, cd9, cd10 )
1583      !!----------------------------------------------------------------------
1584      !!                  ***  ROUTINE  stop_opa  ***
1585      !!
1586      !! ** Purpose :   print in ocean.outpput file a error message and
1587      !!                increment the error number (nstop) by one.
1588      !!----------------------------------------------------------------------
1589      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1590      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1591      !!----------------------------------------------------------------------
1592      !
1593      nstop = nstop + 1
1594
1595      ! force to open ocean.output file
1596      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1597       
1598      WRITE(numout,cform_err)
1599      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1600      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1601      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1602      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1603      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1604      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1605      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1606      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1607      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1608      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1609
1610                               CALL FLUSH(numout    )
1611      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1612      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1613      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1614      !
1615      IF( cd1 == 'STOP' ) THEN
1616         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1617         CALL mppstop(ld_force_abort = .true.)
1618      ENDIF
1619      !
1620   END SUBROUTINE ctl_stop
1621
1622
1623   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1624      &                 cd6, cd7, cd8, cd9, cd10 )
1625      !!----------------------------------------------------------------------
1626      !!                  ***  ROUTINE  stop_warn  ***
1627      !!
1628      !! ** Purpose :   print in ocean.outpput file a error message and
1629      !!                increment the warning number (nwarn) by one.
1630      !!----------------------------------------------------------------------
1631      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1632      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1633      !!----------------------------------------------------------------------
1634      !
1635      nwarn = nwarn + 1
1636      IF(lwp) THEN
1637         WRITE(numout,cform_war)
1638         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1639         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1640         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1641         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1642         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1643         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1644         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1645         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1646         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1647         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1648      ENDIF
1649      CALL FLUSH(numout)
1650      !
1651   END SUBROUTINE ctl_warn
1652
1653
1654   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1655      !!----------------------------------------------------------------------
1656      !!                  ***  ROUTINE ctl_opn  ***
1657      !!
1658      !! ** Purpose :   Open file and check if required file is available.
1659      !!
1660      !! ** Method  :   Fortan open
1661      !!----------------------------------------------------------------------
1662      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1663      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1664      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1665      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1666      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1667      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1668      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1669      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1670      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1671      !
1672      CHARACTER(len=80) ::   clfile
1673      INTEGER           ::   iost
1674      !!----------------------------------------------------------------------
1675      !
1676      ! adapt filename
1677      ! ----------------
1678      clfile = TRIM(cdfile)
1679      IF( PRESENT( karea ) ) THEN
1680         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1681      ENDIF
1682#if defined key_agrif
1683      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1684      knum=Agrif_Get_Unit()
1685#else
1686      knum=get_unit()
1687#endif
1688      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1689      !
1690      iost=0
1691      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1692         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1693      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1694         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1695      ELSE
1696         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1697      ENDIF
1698      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1699         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1700      IF( iost == 0 ) THEN
1701         IF(ldwp) THEN
1702            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1703            WRITE(kout,*) '     unit   = ', knum
1704            WRITE(kout,*) '     status = ', cdstat
1705            WRITE(kout,*) '     form   = ', cdform
1706            WRITE(kout,*) '     access = ', cdacce
1707            WRITE(kout,*)
1708         ENDIF
1709      ENDIF
1710100   CONTINUE
1711      IF( iost /= 0 ) THEN
1712         IF(ldwp) THEN
1713            WRITE(kout,*)
1714            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1715            WRITE(kout,*) ' =======   ===  '
1716            WRITE(kout,*) '           unit   = ', knum
1717            WRITE(kout,*) '           status = ', cdstat
1718            WRITE(kout,*) '           form   = ', cdform
1719            WRITE(kout,*) '           access = ', cdacce
1720            WRITE(kout,*) '           iostat = ', iost
1721            WRITE(kout,*) '           we stop. verify the file '
1722            WRITE(kout,*)
1723         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1724            WRITE(*,*)
1725            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1726            WRITE(*,*) ' =======   ===  '
1727            WRITE(*,*) '           unit   = ', knum
1728            WRITE(*,*) '           status = ', cdstat
1729            WRITE(*,*) '           form   = ', cdform
1730            WRITE(*,*) '           access = ', cdacce
1731            WRITE(*,*) '           iostat = ', iost
1732            WRITE(*,*) '           we stop. verify the file '
1733            WRITE(*,*)
1734         ENDIF
1735         CALL FLUSH( kout ) 
1736         STOP 'ctl_opn bad opening'
1737      ENDIF
1738      !
1739   END SUBROUTINE ctl_opn
1740
1741
1742   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1743      !!----------------------------------------------------------------------
1744      !!                  ***  ROUTINE ctl_nam  ***
1745      !!
1746      !! ** Purpose :   Informations when error while reading a namelist
1747      !!
1748      !! ** Method  :   Fortan open
1749      !!----------------------------------------------------------------------
1750      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1751      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1752      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
1753      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
1754      !!----------------------------------------------------------------------
1755      !
1756      WRITE (clios, '(I5.0)')   kios
1757      IF( kios < 0 ) THEN         
1758         CALL ctl_warn( 'end of record or file while reading namelist '   &
1759            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1760      ENDIF
1761      !
1762      IF( kios > 0 ) THEN
1763         CALL ctl_stop( 'misspelled variable in namelist '   &
1764            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1765      ENDIF
1766      kios = 0
1767      RETURN
1768      !
1769   END SUBROUTINE ctl_nam
1770
1771
1772   INTEGER FUNCTION get_unit()
1773      !!----------------------------------------------------------------------
1774      !!                  ***  FUNCTION  get_unit  ***
1775      !!
1776      !! ** Purpose :   return the index of an unused logical unit
1777      !!----------------------------------------------------------------------
1778      LOGICAL :: llopn
1779      !!----------------------------------------------------------------------
1780      !
1781      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1782      llopn = .TRUE.
1783      DO WHILE( (get_unit < 998) .AND. llopn )
1784         get_unit = get_unit + 1
1785         INQUIRE( unit = get_unit, opened = llopn )
1786      END DO
1787      IF( (get_unit == 999) .AND. llopn ) THEN
1788         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
1789         get_unit = -1
1790      ENDIF
1791      !
1792   END FUNCTION get_unit
1793
1794   !!----------------------------------------------------------------------
1795END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.