New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/trunk/src/OCE/LBC – NEMO

source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 10437

Last change on this file since 10437 was 10437, checked in by smasson, 5 years ago

trunk: improve communication_report.txt for delayed global comm

  • Property svn:keywords set to Id
File size: 89.6 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
156   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
157   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
158   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
159   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
160   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
161   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
162   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
163   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
164   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
165   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
166   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
167   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
168   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
169   !: name (used as id) of allreduce-delayed operations
170   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb' /)
171   !: component name where the allreduce-delayed operation is performed
172   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
173   TYPE, PUBLIC ::   DELAYARR
174      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
175      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
176   END TYPE DELAYARR
177   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC  ::   todelay             
178   INTEGER,          DIMENSION(nbdelay), PUBLIC  ::   ndelayid = -1     !: mpi request id of the delayed operations
179
180   ! timing summary report
181   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
182   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
183   
184   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
185
186   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
187   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
188
189   !!----------------------------------------------------------------------
190   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
191   !! $Id$
192   !! Software governed by the CeCILL license (see ./LICENSE)
193   !!----------------------------------------------------------------------
194CONTAINS
195
196   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
197      !!----------------------------------------------------------------------
198      !!                  ***  routine mynode  ***
199      !!
200      !! ** Purpose :   Find processor unit
201      !!----------------------------------------------------------------------
202      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
203      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
204      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
205      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
206      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
207      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
208      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
209      !
210      INTEGER ::   mynode, ierr, code, ji, ii, ios
211      LOGICAL ::   mpi_was_called
212      !
213      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
214      !!----------------------------------------------------------------------
215      !
216      ii = 1
217      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
218      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
220      !
221      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
222      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
223901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
224      !
225      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
226      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
227902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
228      !
229      !                              ! control print
230      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
231      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
232      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
233      !
234      IF( jpni < 1 .OR. jpnj < 1  ) THEN
235         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
236      ELSE
237         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
238         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
239      ENDIF
240
241      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
242
243      CALL mpi_initialized ( mpi_was_called, code )
244      IF( code /= MPI_SUCCESS ) THEN
245         DO ji = 1, SIZE(ldtxt)
246            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
247         END DO
248         WRITE(*, cform_err)
249         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
250         CALL mpi_abort( mpi_comm_world, code, ierr )
251      ENDIF
252
253      IF( mpi_was_called ) THEN
254         !
255         SELECT CASE ( cn_mpi_send )
256         CASE ( 'S' )                ! Standard mpi send (blocking)
257            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
258         CASE ( 'B' )                ! Buffer mpi send (blocking)
259            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
260            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
261         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
262            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
263            l_isend = .TRUE.
264         CASE DEFAULT
265            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
266            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
267            kstop = kstop + 1
268         END SELECT
269         !
270      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
271         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
272         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
273         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
274         kstop = kstop + 1
275      ELSE
276         SELECT CASE ( cn_mpi_send )
277         CASE ( 'S' )                ! Standard mpi send (blocking)
278            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
279            CALL mpi_init( ierr )
280         CASE ( 'B' )                ! Buffer mpi send (blocking)
281            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
282            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
283         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
284            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
285            l_isend = .TRUE.
286            CALL mpi_init( ierr )
287         CASE DEFAULT
288            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
289            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
290            kstop = kstop + 1
291         END SELECT
292         !
293      ENDIF
294
295      IF( PRESENT(localComm) ) THEN
296         IF( Agrif_Root() ) THEN
297            mpi_comm_oce = localComm
298         ENDIF
299      ELSE
300         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
301         IF( code /= MPI_SUCCESS ) THEN
302            DO ji = 1, SIZE(ldtxt)
303               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
304            END DO
305            WRITE(*, cform_err)
306            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
307            CALL mpi_abort( mpi_comm_world, code, ierr )
308         ENDIF
309      ENDIF
310
311#if defined key_agrif
312      IF( Agrif_Root() ) THEN
313         CALL Agrif_MPI_Init(mpi_comm_oce)
314      ELSE
315         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
316      ENDIF
317#endif
318
319      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
320      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
321      mynode = mpprank
322
323      IF( mynode == 0 ) THEN
324         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
325         WRITE(kumond, nammpp)     
326      ENDIF
327      !
328      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
329      !
330   END FUNCTION mynode
331
332   !!----------------------------------------------------------------------
333   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
334   !!
335   !!   * Argument : dummy argument use in mpp_lnk_... routines
336   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
337   !!                cd_nat :   nature of array grid-points
338   !!                psgn   :   sign used across the north fold boundary
339   !!                kfld   :   optional, number of pt3d arrays
340   !!                cd_mpp :   optional, fill the overlap area only
341   !!                pval   :   optional, background value (used at closed boundaries)
342   !!----------------------------------------------------------------------
343   !
344   !                       !==  2D array and array of 2D pointer  ==!
345   !
346#  define DIM_2d
347#     define ROUTINE_LNK           mpp_lnk_2d
348#     include "mpp_lnk_generic.h90"
349#     undef ROUTINE_LNK
350#     define MULTI
351#     define ROUTINE_LNK           mpp_lnk_2d_ptr
352#     include "mpp_lnk_generic.h90"
353#     undef ROUTINE_LNK
354#     undef MULTI
355#  undef DIM_2d
356   !
357   !                       !==  3D array and array of 3D pointer  ==!
358   !
359#  define DIM_3d
360#     define ROUTINE_LNK           mpp_lnk_3d
361#     include "mpp_lnk_generic.h90"
362#     undef ROUTINE_LNK
363#     define MULTI
364#     define ROUTINE_LNK           mpp_lnk_3d_ptr
365#     include "mpp_lnk_generic.h90"
366#     undef ROUTINE_LNK
367#     undef MULTI
368#  undef DIM_3d
369   !
370   !                       !==  4D array and array of 4D pointer  ==!
371   !
372#  define DIM_4d
373#     define ROUTINE_LNK           mpp_lnk_4d
374#     include "mpp_lnk_generic.h90"
375#     undef ROUTINE_LNK
376#     define MULTI
377#     define ROUTINE_LNK           mpp_lnk_4d_ptr
378#     include "mpp_lnk_generic.h90"
379#     undef ROUTINE_LNK
380#     undef MULTI
381#  undef DIM_4d
382
383   !!----------------------------------------------------------------------
384   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
385   !!
386   !!   * Argument : dummy argument use in mpp_nfd_... routines
387   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
388   !!                cd_nat :   nature of array grid-points
389   !!                psgn   :   sign used across the north fold boundary
390   !!                kfld   :   optional, number of pt3d arrays
391   !!                cd_mpp :   optional, fill the overlap area only
392   !!                pval   :   optional, background value (used at closed boundaries)
393   !!----------------------------------------------------------------------
394   !
395   !                       !==  2D array and array of 2D pointer  ==!
396   !
397#  define DIM_2d
398#     define ROUTINE_NFD           mpp_nfd_2d
399#     include "mpp_nfd_generic.h90"
400#     undef ROUTINE_NFD
401#     define MULTI
402#     define ROUTINE_NFD           mpp_nfd_2d_ptr
403#     include "mpp_nfd_generic.h90"
404#     undef ROUTINE_NFD
405#     undef MULTI
406#  undef DIM_2d
407   !
408   !                       !==  3D array and array of 3D pointer  ==!
409   !
410#  define DIM_3d
411#     define ROUTINE_NFD           mpp_nfd_3d
412#     include "mpp_nfd_generic.h90"
413#     undef ROUTINE_NFD
414#     define MULTI
415#     define ROUTINE_NFD           mpp_nfd_3d_ptr
416#     include "mpp_nfd_generic.h90"
417#     undef ROUTINE_NFD
418#     undef MULTI
419#  undef DIM_3d
420   !
421   !                       !==  4D array and array of 4D pointer  ==!
422   !
423#  define DIM_4d
424#     define ROUTINE_NFD           mpp_nfd_4d
425#     include "mpp_nfd_generic.h90"
426#     undef ROUTINE_NFD
427#     define MULTI
428#     define ROUTINE_NFD           mpp_nfd_4d_ptr
429#     include "mpp_nfd_generic.h90"
430#     undef ROUTINE_NFD
431#     undef MULTI
432#  undef DIM_4d
433
434
435   !!----------------------------------------------------------------------
436   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
437   !!
438   !!   * Argument : dummy argument use in mpp_lnk_... routines
439   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
440   !!                cd_nat :   nature of array grid-points
441   !!                psgn   :   sign used across the north fold boundary
442   !!                kb_bdy :   BDY boundary set
443   !!                kfld   :   optional, number of pt3d arrays
444   !!----------------------------------------------------------------------
445   !
446   !                       !==  2D array and array of 2D pointer  ==!
447   !
448#  define DIM_2d
449#     define ROUTINE_BDY           mpp_lnk_bdy_2d
450#     include "mpp_bdy_generic.h90"
451#     undef ROUTINE_BDY
452#  undef DIM_2d
453   !
454   !                       !==  3D array and array of 3D pointer  ==!
455   !
456#  define DIM_3d
457#     define ROUTINE_BDY           mpp_lnk_bdy_3d
458#     include "mpp_bdy_generic.h90"
459#     undef ROUTINE_BDY
460#  undef DIM_3d
461   !
462   !                       !==  4D array and array of 4D pointer  ==!
463   !
464#  define DIM_4d
465#     define ROUTINE_BDY           mpp_lnk_bdy_4d
466#     include "mpp_bdy_generic.h90"
467#     undef ROUTINE_BDY
468#  undef DIM_4d
469
470   !!----------------------------------------------------------------------
471   !!
472   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
473   
474   
475   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
476   
477   
478   !!----------------------------------------------------------------------
479
480
481
482   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
483      !!----------------------------------------------------------------------
484      !!                  ***  routine mppsend  ***
485      !!
486      !! ** Purpose :   Send messag passing array
487      !!
488      !!----------------------------------------------------------------------
489      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
490      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
491      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
492      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
493      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
494      !!
495      INTEGER ::   iflag
496      !!----------------------------------------------------------------------
497      !
498      SELECT CASE ( cn_mpi_send )
499      CASE ( 'S' )                ! Standard mpi send (blocking)
500         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
501      CASE ( 'B' )                ! Buffer mpi send (blocking)
502         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
503      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
504         ! be carefull, one more argument here : the mpi request identifier..
505         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
506      END SELECT
507      !
508   END SUBROUTINE mppsend
509
510
511   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
512      !!----------------------------------------------------------------------
513      !!                  ***  routine mpprecv  ***
514      !!
515      !! ** Purpose :   Receive messag passing array
516      !!
517      !!----------------------------------------------------------------------
518      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
519      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
520      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
521      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
522      !!
523      INTEGER :: istatus(mpi_status_size)
524      INTEGER :: iflag
525      INTEGER :: use_source
526      !!----------------------------------------------------------------------
527      !
528      ! If a specific process number has been passed to the receive call,
529      ! use that one. Default is to use mpi_any_source
530      use_source = mpi_any_source
531      IF( PRESENT(ksource) )   use_source = ksource
532      !
533      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
534      !
535   END SUBROUTINE mpprecv
536
537
538   SUBROUTINE mppgather( ptab, kp, pio )
539      !!----------------------------------------------------------------------
540      !!                   ***  routine mppgather  ***
541      !!
542      !! ** Purpose :   Transfert between a local subdomain array and a work
543      !!     array which is distributed following the vertical level.
544      !!
545      !!----------------------------------------------------------------------
546      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
547      INTEGER                           , INTENT(in   ) ::   kp     ! record length
548      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
549      !!
550      INTEGER :: itaille, ierror   ! temporary integer
551      !!---------------------------------------------------------------------
552      !
553      itaille = jpi * jpj
554      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
555         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
556      !
557   END SUBROUTINE mppgather
558
559
560   SUBROUTINE mppscatter( pio, kp, ptab )
561      !!----------------------------------------------------------------------
562      !!                  ***  routine mppscatter  ***
563      !!
564      !! ** Purpose :   Transfert between awork array which is distributed
565      !!      following the vertical level and the local subdomain array.
566      !!
567      !!----------------------------------------------------------------------
568      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
569      INTEGER                             ::   kp     ! Tag (not used with MPI
570      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
571      !!
572      INTEGER :: itaille, ierror   ! temporary integer
573      !!---------------------------------------------------------------------
574      !
575      itaille = jpi * jpj
576      !
577      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
578         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
579      !
580   END SUBROUTINE mppscatter
581
582   
583   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
584     !!----------------------------------------------------------------------
585      !!                   ***  routine mpp_delay_sum  ***
586      !!
587      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
588      !!
589      !!----------------------------------------------------------------------
590      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
591      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
592      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
593      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
594      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
595      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
596      !!
597      INTEGER ::   ji, isz
598      INTEGER ::   idvar
599      INTEGER ::   ierr, ilocalcomm
600      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
601      !!----------------------------------------------------------------------
602      ilocalcomm = mpi_comm_oce
603      IF( PRESENT(kcom) )   ilocalcomm = kcom
604
605      isz = SIZE(y_in)
606     
607      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
608
609      idvar = -1
610      DO ji = 1, nbdelay
611         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
612      END DO
613      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
614
615      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
616         !                                       --------------------------
617         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
618            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
619            DEALLOCATE(todelay(idvar)%z1d)
620            ndelayid(idvar) = -1                                      ! do as if we had no restart
621         ELSE
622            ALLOCATE(todelay(idvar)%y1d(isz))
623            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
624         END IF
625      ENDIF
626     
627      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
628         !                                       --------------------------
629         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
630         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
631         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
632      ENDIF
633
634      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
635
636      ! send back pout from todelay(idvar)%z1d defined at previous call
637      pout(:) = todelay(idvar)%z1d(:)
638
639      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
640      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
641
642   END SUBROUTINE mpp_delay_sum
643
644   
645   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
646      !!----------------------------------------------------------------------
647      !!                   ***  routine mpp_delay_max  ***
648      !!
649      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
650      !!
651      !!----------------------------------------------------------------------
652      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
653      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
654      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
655      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
656      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
657      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
658      !!
659      INTEGER ::   ji, isz
660      INTEGER ::   idvar
661      INTEGER ::   ierr, ilocalcomm
662      !!----------------------------------------------------------------------
663      ilocalcomm = mpi_comm_oce
664      IF( PRESENT(kcom) )   ilocalcomm = kcom
665
666      isz = SIZE(p_in)
667
668      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
669
670      idvar = -1
671      DO ji = 1, nbdelay
672         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
673      END DO
674      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
675
676      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
677         !                                       --------------------------
678         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
679            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
680            DEALLOCATE(todelay(idvar)%z1d)
681            ndelayid(idvar) = -1                                      ! do as if we had no restart
682         END IF
683      ENDIF
684
685      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
686         !                                       --------------------------
687         ALLOCATE(todelay(idvar)%z1d(isz))
688         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
689      ENDIF
690
691      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
692
693      ! send back pout from todelay(idvar)%z1d defined at previous call
694      pout(:) = todelay(idvar)%z1d(:)
695
696      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
697      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
698
699   END SUBROUTINE mpp_delay_max
700
701   
702   SUBROUTINE mpp_delay_rcv( kid )
703      !!----------------------------------------------------------------------
704      !!                   ***  routine mpp_delay_rcv  ***
705      !!
706      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
707      !!
708      !!----------------------------------------------------------------------
709      INTEGER,INTENT(in   )      ::  kid 
710      INTEGER ::   ierr
711      !!----------------------------------------------------------------------
712      IF( ndelayid(kid) /= -2 ) THEN 
713         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
714         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
715         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
716         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
717         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
718      ENDIF
719   END SUBROUTINE mpp_delay_rcv
720
721   
722   !!----------------------------------------------------------------------
723   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
724   !!   
725   !!----------------------------------------------------------------------
726   !!
727#  define OPERATION_MAX
728#  define INTEGER_TYPE
729#  define DIM_0d
730#     define ROUTINE_ALLREDUCE           mppmax_int
731#     include "mpp_allreduce_generic.h90"
732#     undef ROUTINE_ALLREDUCE
733#  undef DIM_0d
734#  define DIM_1d
735#     define ROUTINE_ALLREDUCE           mppmax_a_int
736#     include "mpp_allreduce_generic.h90"
737#     undef ROUTINE_ALLREDUCE
738#  undef DIM_1d
739#  undef INTEGER_TYPE
740!
741#  define REAL_TYPE
742#  define DIM_0d
743#     define ROUTINE_ALLREDUCE           mppmax_real
744#     include "mpp_allreduce_generic.h90"
745#     undef ROUTINE_ALLREDUCE
746#  undef DIM_0d
747#  define DIM_1d
748#     define ROUTINE_ALLREDUCE           mppmax_a_real
749#     include "mpp_allreduce_generic.h90"
750#     undef ROUTINE_ALLREDUCE
751#  undef DIM_1d
752#  undef REAL_TYPE
753#  undef OPERATION_MAX
754   !!----------------------------------------------------------------------
755   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
756   !!   
757   !!----------------------------------------------------------------------
758   !!
759#  define OPERATION_MIN
760#  define INTEGER_TYPE
761#  define DIM_0d
762#     define ROUTINE_ALLREDUCE           mppmin_int
763#     include "mpp_allreduce_generic.h90"
764#     undef ROUTINE_ALLREDUCE
765#  undef DIM_0d
766#  define DIM_1d
767#     define ROUTINE_ALLREDUCE           mppmin_a_int
768#     include "mpp_allreduce_generic.h90"
769#     undef ROUTINE_ALLREDUCE
770#  undef DIM_1d
771#  undef INTEGER_TYPE
772!
773#  define REAL_TYPE
774#  define DIM_0d
775#     define ROUTINE_ALLREDUCE           mppmin_real
776#     include "mpp_allreduce_generic.h90"
777#     undef ROUTINE_ALLREDUCE
778#  undef DIM_0d
779#  define DIM_1d
780#     define ROUTINE_ALLREDUCE           mppmin_a_real
781#     include "mpp_allreduce_generic.h90"
782#     undef ROUTINE_ALLREDUCE
783#  undef DIM_1d
784#  undef REAL_TYPE
785#  undef OPERATION_MIN
786
787   !!----------------------------------------------------------------------
788   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
789   !!   
790   !!   Global sum of 1D array or a variable (integer, real or complex)
791   !!----------------------------------------------------------------------
792   !!
793#  define OPERATION_SUM
794#  define INTEGER_TYPE
795#  define DIM_0d
796#     define ROUTINE_ALLREDUCE           mppsum_int
797#     include "mpp_allreduce_generic.h90"
798#     undef ROUTINE_ALLREDUCE
799#  undef DIM_0d
800#  define DIM_1d
801#     define ROUTINE_ALLREDUCE           mppsum_a_int
802#     include "mpp_allreduce_generic.h90"
803#     undef ROUTINE_ALLREDUCE
804#  undef DIM_1d
805#  undef INTEGER_TYPE
806!
807#  define REAL_TYPE
808#  define DIM_0d
809#     define ROUTINE_ALLREDUCE           mppsum_real
810#     include "mpp_allreduce_generic.h90"
811#     undef ROUTINE_ALLREDUCE
812#  undef DIM_0d
813#  define DIM_1d
814#     define ROUTINE_ALLREDUCE           mppsum_a_real
815#     include "mpp_allreduce_generic.h90"
816#     undef ROUTINE_ALLREDUCE
817#  undef DIM_1d
818#  undef REAL_TYPE
819#  undef OPERATION_SUM
820
821#  define OPERATION_SUM_DD
822#  define COMPLEX_TYPE
823#  define DIM_0d
824#     define ROUTINE_ALLREDUCE           mppsum_realdd
825#     include "mpp_allreduce_generic.h90"
826#     undef ROUTINE_ALLREDUCE
827#  undef DIM_0d
828#  define DIM_1d
829#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
830#     include "mpp_allreduce_generic.h90"
831#     undef ROUTINE_ALLREDUCE
832#  undef DIM_1d
833#  undef COMPLEX_TYPE
834#  undef OPERATION_SUM_DD
835
836   !!----------------------------------------------------------------------
837   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
838   !!   
839   !!----------------------------------------------------------------------
840   !!
841#  define OPERATION_MINLOC
842#  define DIM_2d
843#     define ROUTINE_LOC           mpp_minloc2d
844#     include "mpp_loc_generic.h90"
845#     undef ROUTINE_LOC
846#  undef DIM_2d
847#  define DIM_3d
848#     define ROUTINE_LOC           mpp_minloc3d
849#     include "mpp_loc_generic.h90"
850#     undef ROUTINE_LOC
851#  undef DIM_3d
852#  undef OPERATION_MINLOC
853
854#  define OPERATION_MAXLOC
855#  define DIM_2d
856#     define ROUTINE_LOC           mpp_maxloc2d
857#     include "mpp_loc_generic.h90"
858#     undef ROUTINE_LOC
859#  undef DIM_2d
860#  define DIM_3d
861#     define ROUTINE_LOC           mpp_maxloc3d
862#     include "mpp_loc_generic.h90"
863#     undef ROUTINE_LOC
864#  undef DIM_3d
865#  undef OPERATION_MAXLOC
866
867   SUBROUTINE mppsync()
868      !!----------------------------------------------------------------------
869      !!                  ***  routine mppsync  ***
870      !!
871      !! ** Purpose :   Massively parallel processors, synchroneous
872      !!
873      !!-----------------------------------------------------------------------
874      INTEGER :: ierror
875      !!-----------------------------------------------------------------------
876      !
877      CALL mpi_barrier( mpi_comm_oce, ierror )
878      !
879   END SUBROUTINE mppsync
880
881
882   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
883      !!----------------------------------------------------------------------
884      !!                  ***  routine mppstop  ***
885      !!
886      !! ** purpose :   Stop massively parallel processors method
887      !!
888      !!----------------------------------------------------------------------
889      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
890      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
891      LOGICAL ::   llfinal, ll_force_abort
892      INTEGER ::   info
893      !!----------------------------------------------------------------------
894      llfinal = .FALSE.
895      IF( PRESENT(ldfinal) ) llfinal = ldfinal
896      ll_force_abort = .FALSE.
897      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
898      !
899      IF(ll_force_abort) THEN
900         CALL mpi_abort( MPI_COMM_WORLD )
901      ELSE
902         CALL mppsync
903         CALL mpi_finalize( info )
904      ENDIF
905      IF( .NOT. llfinal ) STOP 123456
906      !
907   END SUBROUTINE mppstop
908
909
910   SUBROUTINE mpp_comm_free( kcom )
911      !!----------------------------------------------------------------------
912      INTEGER, INTENT(in) ::   kcom
913      !!
914      INTEGER :: ierr
915      !!----------------------------------------------------------------------
916      !
917      CALL MPI_COMM_FREE(kcom, ierr)
918      !
919   END SUBROUTINE mpp_comm_free
920
921
922   SUBROUTINE mpp_ini_znl( kumout )
923      !!----------------------------------------------------------------------
924      !!               ***  routine mpp_ini_znl  ***
925      !!
926      !! ** Purpose :   Initialize special communicator for computing zonal sum
927      !!
928      !! ** Method  : - Look for processors in the same row
929      !!              - Put their number in nrank_znl
930      !!              - Create group for the znl processors
931      !!              - Create a communicator for znl processors
932      !!              - Determine if processor should write znl files
933      !!
934      !! ** output
935      !!      ndim_rank_znl = number of processors on the same row
936      !!      ngrp_znl = group ID for the znl processors
937      !!      ncomm_znl = communicator for the ice procs.
938      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
939      !!
940      !!----------------------------------------------------------------------
941      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
942      !
943      INTEGER :: jproc      ! dummy loop integer
944      INTEGER :: ierr, ii   ! local integer
945      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
946      !!----------------------------------------------------------------------
947      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
948      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
949      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
950      !
951      ALLOCATE( kwork(jpnij), STAT=ierr )
952      IF( ierr /= 0 ) THEN
953         WRITE(kumout, cform_err)
954         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
955         CALL mppstop
956      ENDIF
957
958      IF( jpnj == 1 ) THEN
959         ngrp_znl  = ngrp_world
960         ncomm_znl = mpi_comm_oce
961      ELSE
962         !
963         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
964         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
965         !-$$        CALL flush(numout)
966         !
967         ! Count number of processors on the same row
968         ndim_rank_znl = 0
969         DO jproc=1,jpnij
970            IF ( kwork(jproc) == njmpp ) THEN
971               ndim_rank_znl = ndim_rank_znl + 1
972            ENDIF
973         END DO
974         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
975         !-$$        CALL flush(numout)
976         ! Allocate the right size to nrank_znl
977         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
978         ALLOCATE(nrank_znl(ndim_rank_znl))
979         ii = 0
980         nrank_znl (:) = 0
981         DO jproc=1,jpnij
982            IF ( kwork(jproc) == njmpp) THEN
983               ii = ii + 1
984               nrank_znl(ii) = jproc -1
985            ENDIF
986         END DO
987         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
988         !-$$        CALL flush(numout)
989
990         ! Create the opa group
991         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
992         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
993         !-$$        CALL flush(numout)
994
995         ! Create the znl group from the opa group
996         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
997         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
998         !-$$        CALL flush(numout)
999
1000         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1001         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1002         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1003         !-$$        CALL flush(numout)
1004         !
1005      END IF
1006
1007      ! Determines if processor if the first (starting from i=1) on the row
1008      IF ( jpni == 1 ) THEN
1009         l_znl_root = .TRUE.
1010      ELSE
1011         l_znl_root = .FALSE.
1012         kwork (1) = nimpp
1013         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1014         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1015      END IF
1016
1017      DEALLOCATE(kwork)
1018
1019   END SUBROUTINE mpp_ini_znl
1020
1021
1022   SUBROUTINE mpp_ini_north
1023      !!----------------------------------------------------------------------
1024      !!               ***  routine mpp_ini_north  ***
1025      !!
1026      !! ** Purpose :   Initialize special communicator for north folding
1027      !!      condition together with global variables needed in the mpp folding
1028      !!
1029      !! ** Method  : - Look for northern processors
1030      !!              - Put their number in nrank_north
1031      !!              - Create groups for the world processors and the north processors
1032      !!              - Create a communicator for northern processors
1033      !!
1034      !! ** output
1035      !!      njmppmax = njmpp for northern procs
1036      !!      ndim_rank_north = number of processors in the northern line
1037      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1038      !!      ngrp_world = group ID for the world processors
1039      !!      ngrp_north = group ID for the northern processors
1040      !!      ncomm_north = communicator for the northern procs.
1041      !!      north_root = number (in the world) of proc 0 in the northern comm.
1042      !!
1043      !!----------------------------------------------------------------------
1044      INTEGER ::   ierr
1045      INTEGER ::   jjproc
1046      INTEGER ::   ii, ji
1047      !!----------------------------------------------------------------------
1048      !
1049      njmppmax = MAXVAL( njmppt )
1050      !
1051      ! Look for how many procs on the northern boundary
1052      ndim_rank_north = 0
1053      DO jjproc = 1, jpnij
1054         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1055      END DO
1056      !
1057      ! Allocate the right size to nrank_north
1058      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1059      ALLOCATE( nrank_north(ndim_rank_north) )
1060
1061      ! Fill the nrank_north array with proc. number of northern procs.
1062      ! Note : the rank start at 0 in MPI
1063      ii = 0
1064      DO ji = 1, jpnij
1065         IF ( njmppt(ji) == njmppmax   ) THEN
1066            ii=ii+1
1067            nrank_north(ii)=ji-1
1068         END IF
1069      END DO
1070      !
1071      ! create the world group
1072      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1073      !
1074      ! Create the North group from the world group
1075      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1076      !
1077      ! Create the North communicator , ie the pool of procs in the north group
1078      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1079      !
1080   END SUBROUTINE mpp_ini_north
1081
1082
1083   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1084      !!---------------------------------------------------------------------
1085      !!                   ***  routine mpp_init.opa  ***
1086      !!
1087      !! ** Purpose :: export and attach a MPI buffer for bsend
1088      !!
1089      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1090      !!            but classical mpi_init
1091      !!
1092      !! History :: 01/11 :: IDRIS initial version for IBM only
1093      !!            08/04 :: R. Benshila, generalisation
1094      !!---------------------------------------------------------------------
1095      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1096      INTEGER                      , INTENT(inout) ::   ksft
1097      INTEGER                      , INTENT(  out) ::   code
1098      INTEGER                                      ::   ierr, ji
1099      LOGICAL                                      ::   mpi_was_called
1100      !!---------------------------------------------------------------------
1101      !
1102      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1103      IF ( code /= MPI_SUCCESS ) THEN
1104         DO ji = 1, SIZE(ldtxt)
1105            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1106         END DO
1107         WRITE(*, cform_err)
1108         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1109         CALL mpi_abort( mpi_comm_world, code, ierr )
1110      ENDIF
1111      !
1112      IF( .NOT. mpi_was_called ) THEN
1113         CALL mpi_init( code )
1114         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1115         IF ( code /= MPI_SUCCESS ) THEN
1116            DO ji = 1, SIZE(ldtxt)
1117               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1118            END DO
1119            WRITE(*, cform_err)
1120            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1121            CALL mpi_abort( mpi_comm_world, code, ierr )
1122         ENDIF
1123      ENDIF
1124      !
1125      IF( nn_buffer > 0 ) THEN
1126         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1127         ! Buffer allocation and attachment
1128         ALLOCATE( tampon(nn_buffer), stat = ierr )
1129         IF( ierr /= 0 ) THEN
1130            DO ji = 1, SIZE(ldtxt)
1131               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1132            END DO
1133            WRITE(*, cform_err)
1134            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1135            CALL mpi_abort( mpi_comm_world, code, ierr )
1136         END IF
1137         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1138      ENDIF
1139      !
1140   END SUBROUTINE mpi_init_oce
1141
1142
1143   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1144      !!---------------------------------------------------------------------
1145      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1146      !!
1147      !!   Modification of original codes written by David H. Bailey
1148      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1149      !!---------------------------------------------------------------------
1150      INTEGER                     , INTENT(in)    ::   ilen, itype
1151      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1152      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1153      !
1154      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1155      INTEGER  :: ji, ztmp           ! local scalar
1156      !!---------------------------------------------------------------------
1157      !
1158      ztmp = itype   ! avoid compilation warning
1159      !
1160      DO ji=1,ilen
1161      ! Compute ydda + yddb using Knuth's trick.
1162         zt1  = real(ydda(ji)) + real(yddb(ji))
1163         zerr = zt1 - real(ydda(ji))
1164         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1165                + aimag(ydda(ji)) + aimag(yddb(ji))
1166
1167         ! The result is zt1 + zt2, after normalization.
1168         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1169      END DO
1170      !
1171   END SUBROUTINE DDPDD_MPI
1172
1173
1174   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1175      !!---------------------------------------------------------------------
1176      !!                   ***  routine mpp_lbc_north_icb  ***
1177      !!
1178      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1179      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1180      !!              array with outer extra halo
1181      !!
1182      !! ** Method  :   North fold condition and mpp with more than one proc
1183      !!              in i-direction require a specific treatment. We gather
1184      !!              the 4+kextj northern lines of the global domain on 1
1185      !!              processor and apply lbc north-fold on this sub array.
1186      !!              Then we scatter the north fold array back to the processors.
1187      !!              This routine accounts for an extra halo with icebergs
1188      !!              and assumes ghost rows and columns have been suppressed.
1189      !!
1190      !!----------------------------------------------------------------------
1191      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1192      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1193      !                                                     !   = T ,  U , V , F or W -points
1194      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1195      !!                                                    ! north fold, =  1. otherwise
1196      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1197      !
1198      INTEGER ::   ji, jj, jr
1199      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1200      INTEGER ::   ipj, ij, iproc
1201      !
1202      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1203      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1204      !!----------------------------------------------------------------------
1205      !
1206      ipj=4
1207      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1208     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1209     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1210      !
1211      ztab_e(:,:)      = 0._wp
1212      znorthloc_e(:,:) = 0._wp
1213      !
1214      ij = 1 - kextj
1215      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1216      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1217         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1218         ij = ij + 1
1219      END DO
1220      !
1221      itaille = jpimax * ( ipj + 2*kextj )
1222      !
1223      IF( ln_timing ) CALL tic_tac(.TRUE.)
1224      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1225         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1226         &                ncomm_north, ierr )
1227      !
1228      IF( ln_timing ) CALL tic_tac(.FALSE.)
1229      !
1230      DO jr = 1, ndim_rank_north            ! recover the global north array
1231         iproc = nrank_north(jr) + 1
1232         ildi = nldit (iproc)
1233         ilei = nleit (iproc)
1234         iilb = nimppt(iproc)
1235         DO jj = 1-kextj, ipj+kextj
1236            DO ji = ildi, ilei
1237               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1238            END DO
1239         END DO
1240      END DO
1241
1242      ! 2. North-Fold boundary conditions
1243      ! ----------------------------------
1244      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1245
1246      ij = 1 - kextj
1247      !! Scatter back to pt2d
1248      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1249         DO ji= 1, jpi
1250            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1251         END DO
1252         ij  = ij +1
1253      END DO
1254      !
1255      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1256      !
1257   END SUBROUTINE mpp_lbc_north_icb
1258
1259
1260   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1261      !!----------------------------------------------------------------------
1262      !!                  ***  routine mpp_lnk_2d_icb  ***
1263      !!
1264      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1265      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1266      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1267      !!
1268      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1269      !!      between processors following neighboring subdomains.
1270      !!            domain parameters
1271      !!                    jpi    : first dimension of the local subdomain
1272      !!                    jpj    : second dimension of the local subdomain
1273      !!                    kexti  : number of columns for extra outer halo
1274      !!                    kextj  : number of rows for extra outer halo
1275      !!                    nbondi : mark for "east-west local boundary"
1276      !!                    nbondj : mark for "north-south local boundary"
1277      !!                    noea   : number for local neighboring processors
1278      !!                    nowe   : number for local neighboring processors
1279      !!                    noso   : number for local neighboring processors
1280      !!                    nono   : number for local neighboring processors
1281      !!----------------------------------------------------------------------
1282      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1283      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1284      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1285      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1286      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1287      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1288      !
1289      INTEGER  ::   jl   ! dummy loop indices
1290      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1291      INTEGER  ::   ipreci, iprecj             !   -       -
1292      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1293      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1294      !!
1295      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1296      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1297      !!----------------------------------------------------------------------
1298
1299      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1300      iprecj = nn_hls + kextj
1301
1302      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1303
1304      ! 1. standard boundary treatment
1305      ! ------------------------------
1306      ! Order matters Here !!!!
1307      !
1308      !                                      ! East-West boundaries
1309      !                                           !* Cyclic east-west
1310      IF( l_Iperio ) THEN
1311         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1312         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1313         !
1314      ELSE                                        !* closed
1315         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1316                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1317      ENDIF
1318      !                                      ! North-South boundaries
1319      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1320         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1321         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1322      ELSE                                        !* closed
1323         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1324                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1325      ENDIF
1326      !
1327
1328      ! north fold treatment
1329      ! -----------------------
1330      IF( npolj /= 0 ) THEN
1331         !
1332         SELECT CASE ( jpni )
1333                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1334                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1335         END SELECT
1336         !
1337      ENDIF
1338
1339      ! 2. East and west directions exchange
1340      ! ------------------------------------
1341      ! we play with the neigbours AND the row number because of the periodicity
1342      !
1343      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1344      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1345         iihom = jpi-nreci-kexti
1346         DO jl = 1, ipreci
1347            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1348            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1349         END DO
1350      END SELECT
1351      !
1352      !                           ! Migrations
1353      imigr = ipreci * ( jpj + 2*kextj )
1354      !
1355      IF( ln_timing ) CALL tic_tac(.TRUE.)
1356      !
1357      SELECT CASE ( nbondi )
1358      CASE ( -1 )
1359         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1360         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1361         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1362      CASE ( 0 )
1363         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1364         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1365         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1366         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1367         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1368         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1369      CASE ( 1 )
1370         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1371         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1372         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1373      END SELECT
1374      !
1375      IF( ln_timing ) CALL tic_tac(.FALSE.)
1376      !
1377      !                           ! Write Dirichlet lateral conditions
1378      iihom = jpi - nn_hls
1379      !
1380      SELECT CASE ( nbondi )
1381      CASE ( -1 )
1382         DO jl = 1, ipreci
1383            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1384         END DO
1385      CASE ( 0 )
1386         DO jl = 1, ipreci
1387            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1388            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1389         END DO
1390      CASE ( 1 )
1391         DO jl = 1, ipreci
1392            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1393         END DO
1394      END SELECT
1395
1396
1397      ! 3. North and south directions
1398      ! -----------------------------
1399      ! always closed : we play only with the neigbours
1400      !
1401      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1402         ijhom = jpj-nrecj-kextj
1403         DO jl = 1, iprecj
1404            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1405            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1406         END DO
1407      ENDIF
1408      !
1409      !                           ! Migrations
1410      imigr = iprecj * ( jpi + 2*kexti )
1411      !
1412      IF( ln_timing ) CALL tic_tac(.TRUE.)
1413      !
1414      SELECT CASE ( nbondj )
1415      CASE ( -1 )
1416         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1417         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1418         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1419      CASE ( 0 )
1420         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1421         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1422         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1423         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1424         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1425         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1426      CASE ( 1 )
1427         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1428         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1429         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1430      END SELECT
1431      !
1432      IF( ln_timing ) CALL tic_tac(.FALSE.)
1433      !
1434      !                           ! Write Dirichlet lateral conditions
1435      ijhom = jpj - nn_hls
1436      !
1437      SELECT CASE ( nbondj )
1438      CASE ( -1 )
1439         DO jl = 1, iprecj
1440            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1441         END DO
1442      CASE ( 0 )
1443         DO jl = 1, iprecj
1444            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1445            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1446         END DO
1447      CASE ( 1 )
1448         DO jl = 1, iprecj
1449            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1450         END DO
1451      END SELECT
1452      !
1453   END SUBROUTINE mpp_lnk_2d_icb
1454
1455
1456   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1457      !!----------------------------------------------------------------------
1458      !!                  ***  routine mpp_report  ***
1459      !!
1460      !! ** Purpose :   report use of mpp routines per time-setp
1461      !!
1462      !!----------------------------------------------------------------------
1463      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1464      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1465      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1466      !!
1467      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1468      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1469      !!----------------------------------------------------------------------
1470      !
1471      ll_lbc = .FALSE.
1472      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1473      ll_glb = .FALSE.
1474      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1475      ll_dlg = .FALSE.
1476      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1477      !
1478      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1479      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1480      ncom_freq = ncom_fsbc
1481      !
1482      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1483         IF( ll_lbc ) THEN
1484            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1485            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1486            n_sequence_lbc = n_sequence_lbc + 1
1487            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1488            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1489            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1490            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1491         ENDIF
1492         IF( ll_glb ) THEN
1493            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1494            n_sequence_glb = n_sequence_glb + 1
1495            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1496            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1497         ENDIF
1498         IF( ll_dlg ) THEN
1499            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1500            n_sequence_dlg = n_sequence_dlg + 1
1501            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1502            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1503         ENDIF
1504      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1505         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1506         WRITE(numcom,*) ' '
1507         WRITE(numcom,*) ' ------------------------------------------------------------'
1508         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1509         WRITE(numcom,*) ' ------------------------------------------------------------'
1510         WRITE(numcom,*) ' '
1511         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1512         jj = 0; jk = 0; jf = 0; jh = 0
1513         DO ji = 1, n_sequence_lbc
1514            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1515            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1516            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1517            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1518         END DO
1519         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1520         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1521         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1522         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1523         WRITE(numcom,*) ' '
1524         WRITE(numcom,*) ' lbc_lnk called'
1525         jj = 1
1526         DO ji = 2, n_sequence_lbc
1527            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1528               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1529               jj = 0
1530            END IF
1531            jj = jj + 1 
1532         END DO
1533         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1534         WRITE(numcom,*) ' '
1535         IF ( n_sequence_glb > 0 ) THEN
1536            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1537            jj = 1
1538            DO ji = 2, n_sequence_glb
1539               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1540                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1541                  jj = 0
1542               END IF
1543               jj = jj + 1 
1544            END DO
1545            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1546            DEALLOCATE(crname_glb)
1547         ELSE
1548            WRITE(numcom,*) ' No MPI global communication '
1549         ENDIF
1550         WRITE(numcom,*) ' '
1551         IF ( n_sequence_dlg > 0 ) THEN
1552            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1553            jj = 1
1554            DO ji = 2, n_sequence_dlg
1555               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1556                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1557                  jj = 0
1558               END IF
1559               jj = jj + 1 
1560            END DO
1561            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1562            DEALLOCATE(crname_dlg)
1563         ELSE
1564            WRITE(numcom,*) ' No MPI delayed global communication '
1565         ENDIF
1566         WRITE(numcom,*) ' '
1567         WRITE(numcom,*) ' -----------------------------------------------'
1568         WRITE(numcom,*) ' '
1569         DEALLOCATE(ncomm_sequence)
1570         DEALLOCATE(crname_lbc)
1571      ENDIF
1572   END SUBROUTINE mpp_report
1573
1574   
1575   SUBROUTINE tic_tac (ld_tic, ld_global)
1576
1577    LOGICAL,           INTENT(IN) :: ld_tic
1578    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1579    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1580    REAL(wp),               SAVE :: tic_ct = 0._wp
1581    INTEGER :: ii
1582
1583    IF( ncom_stp <= nit000 ) RETURN
1584    IF( ncom_stp == nitend ) RETURN
1585    ii = 1
1586    IF( PRESENT( ld_global ) ) THEN
1587       IF( ld_global ) ii = 2
1588    END IF
1589   
1590    IF ( ld_tic ) THEN
1591       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1592       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1593    ELSE
1594       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1595       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1596    ENDIF
1597   
1598   END SUBROUTINE tic_tac
1599
1600   
1601#else
1602   !!----------------------------------------------------------------------
1603   !!   Default case:            Dummy module        share memory computing
1604   !!----------------------------------------------------------------------
1605   USE in_out_manager
1606
1607   INTERFACE mpp_sum
1608      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1609   END INTERFACE
1610   INTERFACE mpp_max
1611      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1612   END INTERFACE
1613   INTERFACE mpp_min
1614      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1615   END INTERFACE
1616   INTERFACE mpp_minloc
1617      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1618   END INTERFACE
1619   INTERFACE mpp_maxloc
1620      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1621   END INTERFACE
1622
1623   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1624   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1625   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1626
1627   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1628   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1629   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1630   TYPE ::   DELAYARR
1631      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1632      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1633   END TYPE DELAYARR
1634   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1635   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1636   !!----------------------------------------------------------------------
1637CONTAINS
1638
1639   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1640      INTEGER, INTENT(in) ::   kumout
1641      lib_mpp_alloc = 0
1642   END FUNCTION lib_mpp_alloc
1643
1644   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1645      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1646      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1647      CHARACTER(len=*) ::   ldname
1648      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1649      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1650      function_value = 0
1651      IF( .FALSE. )   ldtxt(:) = 'never done'
1652      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1653   END FUNCTION mynode
1654
1655   SUBROUTINE mppsync                       ! Dummy routine
1656   END SUBROUTINE mppsync
1657
1658   !!----------------------------------------------------------------------
1659   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1660   !!   
1661   !!----------------------------------------------------------------------
1662   !!
1663#  define OPERATION_MAX
1664#  define INTEGER_TYPE
1665#  define DIM_0d
1666#     define ROUTINE_ALLREDUCE           mppmax_int
1667#     include "mpp_allreduce_generic.h90"
1668#     undef ROUTINE_ALLREDUCE
1669#  undef DIM_0d
1670#  define DIM_1d
1671#     define ROUTINE_ALLREDUCE           mppmax_a_int
1672#     include "mpp_allreduce_generic.h90"
1673#     undef ROUTINE_ALLREDUCE
1674#  undef DIM_1d
1675#  undef INTEGER_TYPE
1676!
1677#  define REAL_TYPE
1678#  define DIM_0d
1679#     define ROUTINE_ALLREDUCE           mppmax_real
1680#     include "mpp_allreduce_generic.h90"
1681#     undef ROUTINE_ALLREDUCE
1682#  undef DIM_0d
1683#  define DIM_1d
1684#     define ROUTINE_ALLREDUCE           mppmax_a_real
1685#     include "mpp_allreduce_generic.h90"
1686#     undef ROUTINE_ALLREDUCE
1687#  undef DIM_1d
1688#  undef REAL_TYPE
1689#  undef OPERATION_MAX
1690   !!----------------------------------------------------------------------
1691   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1692   !!   
1693   !!----------------------------------------------------------------------
1694   !!
1695#  define OPERATION_MIN
1696#  define INTEGER_TYPE
1697#  define DIM_0d
1698#     define ROUTINE_ALLREDUCE           mppmin_int
1699#     include "mpp_allreduce_generic.h90"
1700#     undef ROUTINE_ALLREDUCE
1701#  undef DIM_0d
1702#  define DIM_1d
1703#     define ROUTINE_ALLREDUCE           mppmin_a_int
1704#     include "mpp_allreduce_generic.h90"
1705#     undef ROUTINE_ALLREDUCE
1706#  undef DIM_1d
1707#  undef INTEGER_TYPE
1708!
1709#  define REAL_TYPE
1710#  define DIM_0d
1711#     define ROUTINE_ALLREDUCE           mppmin_real
1712#     include "mpp_allreduce_generic.h90"
1713#     undef ROUTINE_ALLREDUCE
1714#  undef DIM_0d
1715#  define DIM_1d
1716#     define ROUTINE_ALLREDUCE           mppmin_a_real
1717#     include "mpp_allreduce_generic.h90"
1718#     undef ROUTINE_ALLREDUCE
1719#  undef DIM_1d
1720#  undef REAL_TYPE
1721#  undef OPERATION_MIN
1722
1723   !!----------------------------------------------------------------------
1724   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1725   !!   
1726   !!   Global sum of 1D array or a variable (integer, real or complex)
1727   !!----------------------------------------------------------------------
1728   !!
1729#  define OPERATION_SUM
1730#  define INTEGER_TYPE
1731#  define DIM_0d
1732#     define ROUTINE_ALLREDUCE           mppsum_int
1733#     include "mpp_allreduce_generic.h90"
1734#     undef ROUTINE_ALLREDUCE
1735#  undef DIM_0d
1736#  define DIM_1d
1737#     define ROUTINE_ALLREDUCE           mppsum_a_int
1738#     include "mpp_allreduce_generic.h90"
1739#     undef ROUTINE_ALLREDUCE
1740#  undef DIM_1d
1741#  undef INTEGER_TYPE
1742!
1743#  define REAL_TYPE
1744#  define DIM_0d
1745#     define ROUTINE_ALLREDUCE           mppsum_real
1746#     include "mpp_allreduce_generic.h90"
1747#     undef ROUTINE_ALLREDUCE
1748#  undef DIM_0d
1749#  define DIM_1d
1750#     define ROUTINE_ALLREDUCE           mppsum_a_real
1751#     include "mpp_allreduce_generic.h90"
1752#     undef ROUTINE_ALLREDUCE
1753#  undef DIM_1d
1754#  undef REAL_TYPE
1755#  undef OPERATION_SUM
1756
1757#  define OPERATION_SUM_DD
1758#  define COMPLEX_TYPE
1759#  define DIM_0d
1760#     define ROUTINE_ALLREDUCE           mppsum_realdd
1761#     include "mpp_allreduce_generic.h90"
1762#     undef ROUTINE_ALLREDUCE
1763#  undef DIM_0d
1764#  define DIM_1d
1765#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1766#     include "mpp_allreduce_generic.h90"
1767#     undef ROUTINE_ALLREDUCE
1768#  undef DIM_1d
1769#  undef COMPLEX_TYPE
1770#  undef OPERATION_SUM_DD
1771
1772   !!----------------------------------------------------------------------
1773   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1774   !!   
1775   !!----------------------------------------------------------------------
1776   !!
1777#  define OPERATION_MINLOC
1778#  define DIM_2d
1779#     define ROUTINE_LOC           mpp_minloc2d
1780#     include "mpp_loc_generic.h90"
1781#     undef ROUTINE_LOC
1782#  undef DIM_2d
1783#  define DIM_3d
1784#     define ROUTINE_LOC           mpp_minloc3d
1785#     include "mpp_loc_generic.h90"
1786#     undef ROUTINE_LOC
1787#  undef DIM_3d
1788#  undef OPERATION_MINLOC
1789
1790#  define OPERATION_MAXLOC
1791#  define DIM_2d
1792#     define ROUTINE_LOC           mpp_maxloc2d
1793#     include "mpp_loc_generic.h90"
1794#     undef ROUTINE_LOC
1795#  undef DIM_2d
1796#  define DIM_3d
1797#     define ROUTINE_LOC           mpp_maxloc3d
1798#     include "mpp_loc_generic.h90"
1799#     undef ROUTINE_LOC
1800#  undef DIM_3d
1801#  undef OPERATION_MAXLOC
1802
1803   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1804      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1805      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1806      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1807      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1808      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1809      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1810      !
1811      pout(:) = REAL(y_in(:), wp)
1812   END SUBROUTINE mpp_delay_sum
1813
1814   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1815      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1816      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1817      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1818      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1819      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1820      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1821      !
1822      pout(:) = p_in(:)
1823   END SUBROUTINE mpp_delay_max
1824
1825   SUBROUTINE mpp_delay_rcv( kid )
1826      INTEGER,INTENT(in   )      ::  kid 
1827      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1828   END SUBROUTINE mpp_delay_rcv
1829   
1830   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1831      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1832      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1833      STOP      ! non MPP case, just stop the run
1834   END SUBROUTINE mppstop
1835
1836   SUBROUTINE mpp_ini_znl( knum )
1837      INTEGER :: knum
1838      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1839   END SUBROUTINE mpp_ini_znl
1840
1841   SUBROUTINE mpp_comm_free( kcom )
1842      INTEGER :: kcom
1843      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1844   END SUBROUTINE mpp_comm_free
1845   
1846#endif
1847
1848   !!----------------------------------------------------------------------
1849   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1850   !!----------------------------------------------------------------------
1851
1852   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1853      &                 cd6, cd7, cd8, cd9, cd10 )
1854      !!----------------------------------------------------------------------
1855      !!                  ***  ROUTINE  stop_opa  ***
1856      !!
1857      !! ** Purpose :   print in ocean.outpput file a error message and
1858      !!                increment the error number (nstop) by one.
1859      !!----------------------------------------------------------------------
1860      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1861      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1862      !!----------------------------------------------------------------------
1863      !
1864      nstop = nstop + 1
1865
1866      ! force to open ocean.output file
1867      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1868       
1869      WRITE(numout,cform_err)
1870      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1871      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1872      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1873      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1874      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1875      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1876      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1877      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1878      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1879      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1880
1881                               CALL FLUSH(numout    )
1882      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1883      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1884      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1885      !
1886      IF( cd1 == 'STOP' ) THEN
1887         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1888         CALL mppstop(ld_force_abort = .true.)
1889      ENDIF
1890      !
1891   END SUBROUTINE ctl_stop
1892
1893
1894   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1895      &                 cd6, cd7, cd8, cd9, cd10 )
1896      !!----------------------------------------------------------------------
1897      !!                  ***  ROUTINE  stop_warn  ***
1898      !!
1899      !! ** Purpose :   print in ocean.outpput file a error message and
1900      !!                increment the warning number (nwarn) by one.
1901      !!----------------------------------------------------------------------
1902      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1903      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1904      !!----------------------------------------------------------------------
1905      !
1906      nwarn = nwarn + 1
1907      IF(lwp) THEN
1908         WRITE(numout,cform_war)
1909         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1910         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1911         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1912         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1913         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1914         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1915         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1916         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1917         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1918         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1919      ENDIF
1920      CALL FLUSH(numout)
1921      !
1922   END SUBROUTINE ctl_warn
1923
1924
1925   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1926      !!----------------------------------------------------------------------
1927      !!                  ***  ROUTINE ctl_opn  ***
1928      !!
1929      !! ** Purpose :   Open file and check if required file is available.
1930      !!
1931      !! ** Method  :   Fortan open
1932      !!----------------------------------------------------------------------
1933      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1934      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1935      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1936      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1937      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1938      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1939      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1940      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1941      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1942      !
1943      CHARACTER(len=80) ::   clfile
1944      INTEGER           ::   iost
1945      !!----------------------------------------------------------------------
1946      !
1947      ! adapt filename
1948      ! ----------------
1949      clfile = TRIM(cdfile)
1950      IF( PRESENT( karea ) ) THEN
1951         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1952      ENDIF
1953#if defined key_agrif
1954      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1955      knum=Agrif_Get_Unit()
1956#else
1957      knum=get_unit()
1958#endif
1959      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1960      !
1961      iost=0
1962      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1963         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1964      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1965         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1966      ELSE
1967         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1968      ENDIF
1969      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1970         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1971      IF( iost == 0 ) THEN
1972         IF(ldwp) THEN
1973            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1974            WRITE(kout,*) '     unit   = ', knum
1975            WRITE(kout,*) '     status = ', cdstat
1976            WRITE(kout,*) '     form   = ', cdform
1977            WRITE(kout,*) '     access = ', cdacce
1978            WRITE(kout,*)
1979         ENDIF
1980      ENDIF
1981100   CONTINUE
1982      IF( iost /= 0 ) THEN
1983         IF(ldwp) THEN
1984            WRITE(kout,*)
1985            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1986            WRITE(kout,*) ' =======   ===  '
1987            WRITE(kout,*) '           unit   = ', knum
1988            WRITE(kout,*) '           status = ', cdstat
1989            WRITE(kout,*) '           form   = ', cdform
1990            WRITE(kout,*) '           access = ', cdacce
1991            WRITE(kout,*) '           iostat = ', iost
1992            WRITE(kout,*) '           we stop. verify the file '
1993            WRITE(kout,*)
1994         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1995            WRITE(*,*)
1996            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1997            WRITE(*,*) ' =======   ===  '
1998            WRITE(*,*) '           unit   = ', knum
1999            WRITE(*,*) '           status = ', cdstat
2000            WRITE(*,*) '           form   = ', cdform
2001            WRITE(*,*) '           access = ', cdacce
2002            WRITE(*,*) '           iostat = ', iost
2003            WRITE(*,*) '           we stop. verify the file '
2004            WRITE(*,*)
2005         ENDIF
2006         CALL FLUSH( kout ) 
2007         STOP 'ctl_opn bad opening'
2008      ENDIF
2009      !
2010   END SUBROUTINE ctl_opn
2011
2012
2013   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
2014      !!----------------------------------------------------------------------
2015      !!                  ***  ROUTINE ctl_nam  ***
2016      !!
2017      !! ** Purpose :   Informations when error while reading a namelist
2018      !!
2019      !! ** Method  :   Fortan open
2020      !!----------------------------------------------------------------------
2021      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
2022      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
2023      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
2024      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
2025      !!----------------------------------------------------------------------
2026      !
2027      WRITE (clios, '(I5.0)')   kios
2028      IF( kios < 0 ) THEN         
2029         CALL ctl_warn( 'end of record or file while reading namelist '   &
2030            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2031      ENDIF
2032      !
2033      IF( kios > 0 ) THEN
2034         CALL ctl_stop( 'misspelled variable in namelist '   &
2035            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2036      ENDIF
2037      kios = 0
2038      RETURN
2039      !
2040   END SUBROUTINE ctl_nam
2041
2042
2043   INTEGER FUNCTION get_unit()
2044      !!----------------------------------------------------------------------
2045      !!                  ***  FUNCTION  get_unit  ***
2046      !!
2047      !! ** Purpose :   return the index of an unused logical unit
2048      !!----------------------------------------------------------------------
2049      LOGICAL :: llopn
2050      !!----------------------------------------------------------------------
2051      !
2052      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2053      llopn = .TRUE.
2054      DO WHILE( (get_unit < 998) .AND. llopn )
2055         get_unit = get_unit + 1
2056         INQUIRE( unit = get_unit, opened = llopn )
2057      END DO
2058      IF( (get_unit == 999) .AND. llopn ) THEN
2059         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2060         get_unit = -1
2061      ENDIF
2062      !
2063   END FUNCTION get_unit
2064
2065   !!----------------------------------------------------------------------
2066END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.