New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2019/dev_r10721_KERNEL-02_Storkey_Coward_IMMERSE_first_steps/src/OCE/LBC – NEMO

source: NEMO/branches/2019/dev_r10721_KERNEL-02_Storkey_Coward_IMMERSE_first_steps/src/OCE/LBC/lib_mpp.F90 @ 11504

Last change on this file since 11504 was 11504, checked in by davestorkey, 4 years ago

2019/dev_r10721_KERNEL-02_Storkey_Coward_IMMERSE_first_steps : Strip out all references to nn_dttrc
and the trcsub.F90 module. Notes:

  1. This version of the code currently breaks the GYRE_PISCES test in SETTE.
  2. With the removal of this option, TOP should use the OCE time index variables, eg. Nbb_trc -> Nbb, nittrc000 -> nit0000 etc.
  • Property svn:keywords set to Id
File size: 90.1 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
156   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
157   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
158   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 3000          !: max number of communication record
162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
165   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
166   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
167   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
168   !: name (used as id) of allreduce-delayed operations
169   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
170   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
171   !: component name where the allreduce-delayed operation is performed
172   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
173   TYPE, PUBLIC ::   DELAYARR
174      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
175      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
176   END TYPE DELAYARR
177   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC  ::   todelay             
178   INTEGER,          DIMENSION(nbdelay), PUBLIC  ::   ndelayid = -1     !: mpi request id of the delayed operations
179
180   ! timing summary report
181   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
182   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
183   
184   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
185
186   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
187   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
188
189   !!----------------------------------------------------------------------
190   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
191   !! $Id$
192   !! Software governed by the CeCILL license (see ./LICENSE)
193   !!----------------------------------------------------------------------
194CONTAINS
195
196   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
197      !!----------------------------------------------------------------------
198      !!                  ***  routine mynode  ***
199      !!
200      !! ** Purpose :   Find processor unit
201      !!----------------------------------------------------------------------
202      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
203      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
204      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
205      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
206      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
207      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
208      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
209      !
210      INTEGER ::   mynode, ierr, code, ji, ii, ios
211      LOGICAL ::   mpi_was_called
212      !
213      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
214      !!----------------------------------------------------------------------
215      !
216      ii = 1
217      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
218      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
220      !
221      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
222      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
223901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
224      !
225      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
226      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
227902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
228      !
229      !                              ! control print
230      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
231      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
232      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
233      !
234      IF( jpni < 1 .OR. jpnj < 1  ) THEN
235         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
236      ELSE
237         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
238         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
239      ENDIF
240
241      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
242
243      CALL mpi_initialized ( mpi_was_called, code )
244      IF( code /= MPI_SUCCESS ) THEN
245         DO ji = 1, SIZE(ldtxt)
246            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
247         END DO
248         WRITE(*, cform_err)
249         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
250         CALL mpi_abort( mpi_comm_world, code, ierr )
251      ENDIF
252
253      IF( mpi_was_called ) THEN
254         !
255         SELECT CASE ( cn_mpi_send )
256         CASE ( 'S' )                ! Standard mpi send (blocking)
257            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
258         CASE ( 'B' )                ! Buffer mpi send (blocking)
259            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
260            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
261         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
262            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
263            l_isend = .TRUE.
264         CASE DEFAULT
265            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
266            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
267            kstop = kstop + 1
268         END SELECT
269         !
270      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
271         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
272         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
273         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
274         kstop = kstop + 1
275      ELSE
276         SELECT CASE ( cn_mpi_send )
277         CASE ( 'S' )                ! Standard mpi send (blocking)
278            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
279            CALL mpi_init( ierr )
280         CASE ( 'B' )                ! Buffer mpi send (blocking)
281            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
282            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
283         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
284            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
285            l_isend = .TRUE.
286            CALL mpi_init( ierr )
287         CASE DEFAULT
288            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
289            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
290            kstop = kstop + 1
291         END SELECT
292         !
293      ENDIF
294
295      IF( PRESENT(localComm) ) THEN
296         IF( Agrif_Root() ) THEN
297            mpi_comm_oce = localComm
298         ENDIF
299      ELSE
300         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
301         IF( code /= MPI_SUCCESS ) THEN
302            DO ji = 1, SIZE(ldtxt)
303               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
304            END DO
305            WRITE(*, cform_err)
306            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
307            CALL mpi_abort( mpi_comm_world, code, ierr )
308         ENDIF
309      ENDIF
310
311#if defined key_agrif
312      IF( Agrif_Root() ) THEN
313         CALL Agrif_MPI_Init(mpi_comm_oce)
314      ELSE
315         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
316      ENDIF
317#endif
318
319      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
320      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
321      mynode = mpprank
322
323      IF( mynode == 0 ) THEN
324         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
325         WRITE(kumond, nammpp)     
326      ENDIF
327      !
328      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
329      !
330   END FUNCTION mynode
331
332   !!----------------------------------------------------------------------
333   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
334   !!
335   !!   * Argument : dummy argument use in mpp_lnk_... routines
336   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
337   !!                cd_nat :   nature of array grid-points
338   !!                psgn   :   sign used across the north fold boundary
339   !!                kfld   :   optional, number of pt3d arrays
340   !!                cd_mpp :   optional, fill the overlap area only
341   !!                pval   :   optional, background value (used at closed boundaries)
342   !!----------------------------------------------------------------------
343   !
344   !                       !==  2D array and array of 2D pointer  ==!
345   !
346#  define DIM_2d
347#     define ROUTINE_LNK           mpp_lnk_2d
348#     include "mpp_lnk_generic.h90"
349#     undef ROUTINE_LNK
350#     define MULTI
351#     define ROUTINE_LNK           mpp_lnk_2d_ptr
352#     include "mpp_lnk_generic.h90"
353#     undef ROUTINE_LNK
354#     undef MULTI
355#  undef DIM_2d
356   !
357   !                       !==  3D array and array of 3D pointer  ==!
358   !
359#  define DIM_3d
360#     define ROUTINE_LNK           mpp_lnk_3d
361#     include "mpp_lnk_generic.h90"
362#     undef ROUTINE_LNK
363#     define MULTI
364#     define ROUTINE_LNK           mpp_lnk_3d_ptr
365#     include "mpp_lnk_generic.h90"
366#     undef ROUTINE_LNK
367#     undef MULTI
368#  undef DIM_3d
369   !
370   !                       !==  4D array and array of 4D pointer  ==!
371   !
372#  define DIM_4d
373#     define ROUTINE_LNK           mpp_lnk_4d
374#     include "mpp_lnk_generic.h90"
375#     undef ROUTINE_LNK
376#     define MULTI
377#     define ROUTINE_LNK           mpp_lnk_4d_ptr
378#     include "mpp_lnk_generic.h90"
379#     undef ROUTINE_LNK
380#     undef MULTI
381#  undef DIM_4d
382
383   !!----------------------------------------------------------------------
384   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
385   !!
386   !!   * Argument : dummy argument use in mpp_nfd_... routines
387   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
388   !!                cd_nat :   nature of array grid-points
389   !!                psgn   :   sign used across the north fold boundary
390   !!                kfld   :   optional, number of pt3d arrays
391   !!                cd_mpp :   optional, fill the overlap area only
392   !!                pval   :   optional, background value (used at closed boundaries)
393   !!----------------------------------------------------------------------
394   !
395   !                       !==  2D array and array of 2D pointer  ==!
396   !
397#  define DIM_2d
398#     define ROUTINE_NFD           mpp_nfd_2d
399#     include "mpp_nfd_generic.h90"
400#     undef ROUTINE_NFD
401#     define MULTI
402#     define ROUTINE_NFD           mpp_nfd_2d_ptr
403#     include "mpp_nfd_generic.h90"
404#     undef ROUTINE_NFD
405#     undef MULTI
406#  undef DIM_2d
407   !
408   !                       !==  3D array and array of 3D pointer  ==!
409   !
410#  define DIM_3d
411#     define ROUTINE_NFD           mpp_nfd_3d
412#     include "mpp_nfd_generic.h90"
413#     undef ROUTINE_NFD
414#     define MULTI
415#     define ROUTINE_NFD           mpp_nfd_3d_ptr
416#     include "mpp_nfd_generic.h90"
417#     undef ROUTINE_NFD
418#     undef MULTI
419#  undef DIM_3d
420   !
421   !                       !==  4D array and array of 4D pointer  ==!
422   !
423#  define DIM_4d
424#     define ROUTINE_NFD           mpp_nfd_4d
425#     include "mpp_nfd_generic.h90"
426#     undef ROUTINE_NFD
427#     define MULTI
428#     define ROUTINE_NFD           mpp_nfd_4d_ptr
429#     include "mpp_nfd_generic.h90"
430#     undef ROUTINE_NFD
431#     undef MULTI
432#  undef DIM_4d
433
434
435   !!----------------------------------------------------------------------
436   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
437   !!
438   !!   * Argument : dummy argument use in mpp_lnk_... routines
439   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
440   !!                cd_nat :   nature of array grid-points
441   !!                psgn   :   sign used across the north fold boundary
442   !!                kb_bdy :   BDY boundary set
443   !!                kfld   :   optional, number of pt3d arrays
444   !!----------------------------------------------------------------------
445   !
446   !                       !==  2D array and array of 2D pointer  ==!
447   !
448#  define DIM_2d
449#     define ROUTINE_BDY           mpp_lnk_bdy_2d
450#     include "mpp_bdy_generic.h90"
451#     undef ROUTINE_BDY
452#  undef DIM_2d
453   !
454   !                       !==  3D array and array of 3D pointer  ==!
455   !
456#  define DIM_3d
457#     define ROUTINE_BDY           mpp_lnk_bdy_3d
458#     include "mpp_bdy_generic.h90"
459#     undef ROUTINE_BDY
460#  undef DIM_3d
461   !
462   !                       !==  4D array and array of 4D pointer  ==!
463   !
464#  define DIM_4d
465#     define ROUTINE_BDY           mpp_lnk_bdy_4d
466#     include "mpp_bdy_generic.h90"
467#     undef ROUTINE_BDY
468#  undef DIM_4d
469
470   !!----------------------------------------------------------------------
471   !!
472   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
473   
474   
475   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
476   
477   
478   !!----------------------------------------------------------------------
479
480
481
482   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
483      !!----------------------------------------------------------------------
484      !!                  ***  routine mppsend  ***
485      !!
486      !! ** Purpose :   Send messag passing array
487      !!
488      !!----------------------------------------------------------------------
489      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
490      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
491      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
492      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
493      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
494      !!
495      INTEGER ::   iflag
496      !!----------------------------------------------------------------------
497      !
498      SELECT CASE ( cn_mpi_send )
499      CASE ( 'S' )                ! Standard mpi send (blocking)
500         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
501      CASE ( 'B' )                ! Buffer mpi send (blocking)
502         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
503      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
504         ! be carefull, one more argument here : the mpi request identifier..
505         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
506      END SELECT
507      !
508   END SUBROUTINE mppsend
509
510
511   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
512      !!----------------------------------------------------------------------
513      !!                  ***  routine mpprecv  ***
514      !!
515      !! ** Purpose :   Receive messag passing array
516      !!
517      !!----------------------------------------------------------------------
518      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
519      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
520      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
521      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
522      !!
523      INTEGER :: istatus(mpi_status_size)
524      INTEGER :: iflag
525      INTEGER :: use_source
526      !!----------------------------------------------------------------------
527      !
528      ! If a specific process number has been passed to the receive call,
529      ! use that one. Default is to use mpi_any_source
530      use_source = mpi_any_source
531      IF( PRESENT(ksource) )   use_source = ksource
532      !
533      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
534      !
535   END SUBROUTINE mpprecv
536
537
538   SUBROUTINE mppgather( ptab, kp, pio )
539      !!----------------------------------------------------------------------
540      !!                   ***  routine mppgather  ***
541      !!
542      !! ** Purpose :   Transfert between a local subdomain array and a work
543      !!     array which is distributed following the vertical level.
544      !!
545      !!----------------------------------------------------------------------
546      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
547      INTEGER                           , INTENT(in   ) ::   kp     ! record length
548      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
549      !!
550      INTEGER :: itaille, ierror   ! temporary integer
551      !!---------------------------------------------------------------------
552      !
553      itaille = jpi * jpj
554      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
555         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
556      !
557   END SUBROUTINE mppgather
558
559
560   SUBROUTINE mppscatter( pio, kp, ptab )
561      !!----------------------------------------------------------------------
562      !!                  ***  routine mppscatter  ***
563      !!
564      !! ** Purpose :   Transfert between awork array which is distributed
565      !!      following the vertical level and the local subdomain array.
566      !!
567      !!----------------------------------------------------------------------
568      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
569      INTEGER                             ::   kp     ! Tag (not used with MPI
570      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
571      !!
572      INTEGER :: itaille, ierror   ! temporary integer
573      !!---------------------------------------------------------------------
574      !
575      itaille = jpi * jpj
576      !
577      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
578         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
579      !
580   END SUBROUTINE mppscatter
581
582   
583   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
584     !!----------------------------------------------------------------------
585      !!                   ***  routine mpp_delay_sum  ***
586      !!
587      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
588      !!
589      !!----------------------------------------------------------------------
590      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
591      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
592      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
593      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
594      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
595      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
596      !!
597      INTEGER ::   ji, isz
598      INTEGER ::   idvar
599      INTEGER ::   ierr, ilocalcomm
600      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
601      !!----------------------------------------------------------------------
602      ilocalcomm = mpi_comm_oce
603      IF( PRESENT(kcom) )   ilocalcomm = kcom
604
605      isz = SIZE(y_in)
606     
607      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
608
609      idvar = -1
610      DO ji = 1, nbdelay
611         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
612      END DO
613      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
614
615      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
616         !                                       --------------------------
617         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
618            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
619            DEALLOCATE(todelay(idvar)%z1d)
620            ndelayid(idvar) = -1                                      ! do as if we had no restart
621         ELSE
622            ALLOCATE(todelay(idvar)%y1d(isz))
623            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
624         END IF
625      ENDIF
626     
627      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
628         !                                       --------------------------
629         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
630         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
631         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
632      ENDIF
633
634      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
635
636      ! send back pout from todelay(idvar)%z1d defined at previous call
637      pout(:) = todelay(idvar)%z1d(:)
638
639      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
640#if defined key_mpi2
641      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
642      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
643      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
644#else
645      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
646#endif
647
648   END SUBROUTINE mpp_delay_sum
649
650   
651   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
652      !!----------------------------------------------------------------------
653      !!                   ***  routine mpp_delay_max  ***
654      !!
655      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
656      !!
657      !!----------------------------------------------------------------------
658      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
659      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
660      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
661      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
662      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
663      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
664      !!
665      INTEGER ::   ji, isz
666      INTEGER ::   idvar
667      INTEGER ::   ierr, ilocalcomm
668      !!----------------------------------------------------------------------
669      ilocalcomm = mpi_comm_oce
670      IF( PRESENT(kcom) )   ilocalcomm = kcom
671
672      isz = SIZE(p_in)
673
674      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
675
676      idvar = -1
677      DO ji = 1, nbdelay
678         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
679      END DO
680      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
681
682      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
683         !                                       --------------------------
684         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
685            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
686            DEALLOCATE(todelay(idvar)%z1d)
687            ndelayid(idvar) = -1                                      ! do as if we had no restart
688         END IF
689      ENDIF
690
691      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
692         !                                       --------------------------
693         ALLOCATE(todelay(idvar)%z1d(isz))
694         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
695      ENDIF
696
697      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
698
699      ! send back pout from todelay(idvar)%z1d defined at previous call
700      pout(:) = todelay(idvar)%z1d(:)
701
702      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
703#if defined key_mpi2
704      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
705      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
706      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
707#else
708      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
709#endif
710
711   END SUBROUTINE mpp_delay_max
712
713   
714   SUBROUTINE mpp_delay_rcv( kid )
715      !!----------------------------------------------------------------------
716      !!                   ***  routine mpp_delay_rcv  ***
717      !!
718      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
719      !!
720      !!----------------------------------------------------------------------
721      INTEGER,INTENT(in   )      ::  kid 
722      INTEGER ::   ierr
723      !!----------------------------------------------------------------------
724      IF( ndelayid(kid) /= -2 ) THEN 
725#if ! defined key_mpi2
726         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
727         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
728         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
729#endif
730         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
731         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
732      ENDIF
733   END SUBROUTINE mpp_delay_rcv
734
735   
736   !!----------------------------------------------------------------------
737   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
738   !!   
739   !!----------------------------------------------------------------------
740   !!
741#  define OPERATION_MAX
742#  define INTEGER_TYPE
743#  define DIM_0d
744#     define ROUTINE_ALLREDUCE           mppmax_int
745#     include "mpp_allreduce_generic.h90"
746#     undef ROUTINE_ALLREDUCE
747#  undef DIM_0d
748#  define DIM_1d
749#     define ROUTINE_ALLREDUCE           mppmax_a_int
750#     include "mpp_allreduce_generic.h90"
751#     undef ROUTINE_ALLREDUCE
752#  undef DIM_1d
753#  undef INTEGER_TYPE
754!
755#  define REAL_TYPE
756#  define DIM_0d
757#     define ROUTINE_ALLREDUCE           mppmax_real
758#     include "mpp_allreduce_generic.h90"
759#     undef ROUTINE_ALLREDUCE
760#  undef DIM_0d
761#  define DIM_1d
762#     define ROUTINE_ALLREDUCE           mppmax_a_real
763#     include "mpp_allreduce_generic.h90"
764#     undef ROUTINE_ALLREDUCE
765#  undef DIM_1d
766#  undef REAL_TYPE
767#  undef OPERATION_MAX
768   !!----------------------------------------------------------------------
769   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
770   !!   
771   !!----------------------------------------------------------------------
772   !!
773#  define OPERATION_MIN
774#  define INTEGER_TYPE
775#  define DIM_0d
776#     define ROUTINE_ALLREDUCE           mppmin_int
777#     include "mpp_allreduce_generic.h90"
778#     undef ROUTINE_ALLREDUCE
779#  undef DIM_0d
780#  define DIM_1d
781#     define ROUTINE_ALLREDUCE           mppmin_a_int
782#     include "mpp_allreduce_generic.h90"
783#     undef ROUTINE_ALLREDUCE
784#  undef DIM_1d
785#  undef INTEGER_TYPE
786!
787#  define REAL_TYPE
788#  define DIM_0d
789#     define ROUTINE_ALLREDUCE           mppmin_real
790#     include "mpp_allreduce_generic.h90"
791#     undef ROUTINE_ALLREDUCE
792#  undef DIM_0d
793#  define DIM_1d
794#     define ROUTINE_ALLREDUCE           mppmin_a_real
795#     include "mpp_allreduce_generic.h90"
796#     undef ROUTINE_ALLREDUCE
797#  undef DIM_1d
798#  undef REAL_TYPE
799#  undef OPERATION_MIN
800
801   !!----------------------------------------------------------------------
802   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
803   !!   
804   !!   Global sum of 1D array or a variable (integer, real or complex)
805   !!----------------------------------------------------------------------
806   !!
807#  define OPERATION_SUM
808#  define INTEGER_TYPE
809#  define DIM_0d
810#     define ROUTINE_ALLREDUCE           mppsum_int
811#     include "mpp_allreduce_generic.h90"
812#     undef ROUTINE_ALLREDUCE
813#  undef DIM_0d
814#  define DIM_1d
815#     define ROUTINE_ALLREDUCE           mppsum_a_int
816#     include "mpp_allreduce_generic.h90"
817#     undef ROUTINE_ALLREDUCE
818#  undef DIM_1d
819#  undef INTEGER_TYPE
820!
821#  define REAL_TYPE
822#  define DIM_0d
823#     define ROUTINE_ALLREDUCE           mppsum_real
824#     include "mpp_allreduce_generic.h90"
825#     undef ROUTINE_ALLREDUCE
826#  undef DIM_0d
827#  define DIM_1d
828#     define ROUTINE_ALLREDUCE           mppsum_a_real
829#     include "mpp_allreduce_generic.h90"
830#     undef ROUTINE_ALLREDUCE
831#  undef DIM_1d
832#  undef REAL_TYPE
833#  undef OPERATION_SUM
834
835#  define OPERATION_SUM_DD
836#  define COMPLEX_TYPE
837#  define DIM_0d
838#     define ROUTINE_ALLREDUCE           mppsum_realdd
839#     include "mpp_allreduce_generic.h90"
840#     undef ROUTINE_ALLREDUCE
841#  undef DIM_0d
842#  define DIM_1d
843#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
844#     include "mpp_allreduce_generic.h90"
845#     undef ROUTINE_ALLREDUCE
846#  undef DIM_1d
847#  undef COMPLEX_TYPE
848#  undef OPERATION_SUM_DD
849
850   !!----------------------------------------------------------------------
851   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
852   !!   
853   !!----------------------------------------------------------------------
854   !!
855#  define OPERATION_MINLOC
856#  define DIM_2d
857#     define ROUTINE_LOC           mpp_minloc2d
858#     include "mpp_loc_generic.h90"
859#     undef ROUTINE_LOC
860#  undef DIM_2d
861#  define DIM_3d
862#     define ROUTINE_LOC           mpp_minloc3d
863#     include "mpp_loc_generic.h90"
864#     undef ROUTINE_LOC
865#  undef DIM_3d
866#  undef OPERATION_MINLOC
867
868#  define OPERATION_MAXLOC
869#  define DIM_2d
870#     define ROUTINE_LOC           mpp_maxloc2d
871#     include "mpp_loc_generic.h90"
872#     undef ROUTINE_LOC
873#  undef DIM_2d
874#  define DIM_3d
875#     define ROUTINE_LOC           mpp_maxloc3d
876#     include "mpp_loc_generic.h90"
877#     undef ROUTINE_LOC
878#  undef DIM_3d
879#  undef OPERATION_MAXLOC
880
881   SUBROUTINE mppsync()
882      !!----------------------------------------------------------------------
883      !!                  ***  routine mppsync  ***
884      !!
885      !! ** Purpose :   Massively parallel processors, synchroneous
886      !!
887      !!-----------------------------------------------------------------------
888      INTEGER :: ierror
889      !!-----------------------------------------------------------------------
890      !
891      CALL mpi_barrier( mpi_comm_oce, ierror )
892      !
893   END SUBROUTINE mppsync
894
895
896   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
897      !!----------------------------------------------------------------------
898      !!                  ***  routine mppstop  ***
899      !!
900      !! ** purpose :   Stop massively parallel processors method
901      !!
902      !!----------------------------------------------------------------------
903      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
904      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
905      LOGICAL ::   llfinal, ll_force_abort
906      INTEGER ::   info
907      !!----------------------------------------------------------------------
908      llfinal = .FALSE.
909      IF( PRESENT(ldfinal) ) llfinal = ldfinal
910      ll_force_abort = .FALSE.
911      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
912      !
913      IF(ll_force_abort) THEN
914         CALL mpi_abort( MPI_COMM_WORLD )
915      ELSE
916         CALL mppsync
917         CALL mpi_finalize( info )
918      ENDIF
919      IF( .NOT. llfinal ) STOP 123456
920      !
921   END SUBROUTINE mppstop
922
923
924   SUBROUTINE mpp_comm_free( kcom )
925      !!----------------------------------------------------------------------
926      INTEGER, INTENT(in) ::   kcom
927      !!
928      INTEGER :: ierr
929      !!----------------------------------------------------------------------
930      !
931      CALL MPI_COMM_FREE(kcom, ierr)
932      !
933   END SUBROUTINE mpp_comm_free
934
935
936   SUBROUTINE mpp_ini_znl( kumout )
937      !!----------------------------------------------------------------------
938      !!               ***  routine mpp_ini_znl  ***
939      !!
940      !! ** Purpose :   Initialize special communicator for computing zonal sum
941      !!
942      !! ** Method  : - Look for processors in the same row
943      !!              - Put their number in nrank_znl
944      !!              - Create group for the znl processors
945      !!              - Create a communicator for znl processors
946      !!              - Determine if processor should write znl files
947      !!
948      !! ** output
949      !!      ndim_rank_znl = number of processors on the same row
950      !!      ngrp_znl = group ID for the znl processors
951      !!      ncomm_znl = communicator for the ice procs.
952      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
953      !!
954      !!----------------------------------------------------------------------
955      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
956      !
957      INTEGER :: jproc      ! dummy loop integer
958      INTEGER :: ierr, ii   ! local integer
959      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
960      !!----------------------------------------------------------------------
961      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
962      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
963      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
964      !
965      ALLOCATE( kwork(jpnij), STAT=ierr )
966      IF( ierr /= 0 ) THEN
967         WRITE(kumout, cform_err)
968         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
969         CALL mppstop
970      ENDIF
971
972      IF( jpnj == 1 ) THEN
973         ngrp_znl  = ngrp_world
974         ncomm_znl = mpi_comm_oce
975      ELSE
976         !
977         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
978         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
979         !-$$        CALL flush(numout)
980         !
981         ! Count number of processors on the same row
982         ndim_rank_znl = 0
983         DO jproc=1,jpnij
984            IF ( kwork(jproc) == njmpp ) THEN
985               ndim_rank_znl = ndim_rank_znl + 1
986            ENDIF
987         END DO
988         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
989         !-$$        CALL flush(numout)
990         ! Allocate the right size to nrank_znl
991         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
992         ALLOCATE(nrank_znl(ndim_rank_znl))
993         ii = 0
994         nrank_znl (:) = 0
995         DO jproc=1,jpnij
996            IF ( kwork(jproc) == njmpp) THEN
997               ii = ii + 1
998               nrank_znl(ii) = jproc -1
999            ENDIF
1000         END DO
1001         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1002         !-$$        CALL flush(numout)
1003
1004         ! Create the opa group
1005         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1006         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1007         !-$$        CALL flush(numout)
1008
1009         ! Create the znl group from the opa group
1010         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1011         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1012         !-$$        CALL flush(numout)
1013
1014         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1015         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1016         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1017         !-$$        CALL flush(numout)
1018         !
1019      END IF
1020
1021      ! Determines if processor if the first (starting from i=1) on the row
1022      IF ( jpni == 1 ) THEN
1023         l_znl_root = .TRUE.
1024      ELSE
1025         l_znl_root = .FALSE.
1026         kwork (1) = nimpp
1027         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1028         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1029      END IF
1030
1031      DEALLOCATE(kwork)
1032
1033   END SUBROUTINE mpp_ini_znl
1034
1035
1036   SUBROUTINE mpp_ini_north
1037      !!----------------------------------------------------------------------
1038      !!               ***  routine mpp_ini_north  ***
1039      !!
1040      !! ** Purpose :   Initialize special communicator for north folding
1041      !!      condition together with global variables needed in the mpp folding
1042      !!
1043      !! ** Method  : - Look for northern processors
1044      !!              - Put their number in nrank_north
1045      !!              - Create groups for the world processors and the north processors
1046      !!              - Create a communicator for northern processors
1047      !!
1048      !! ** output
1049      !!      njmppmax = njmpp for northern procs
1050      !!      ndim_rank_north = number of processors in the northern line
1051      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1052      !!      ngrp_world = group ID for the world processors
1053      !!      ngrp_north = group ID for the northern processors
1054      !!      ncomm_north = communicator for the northern procs.
1055      !!      north_root = number (in the world) of proc 0 in the northern comm.
1056      !!
1057      !!----------------------------------------------------------------------
1058      INTEGER ::   ierr
1059      INTEGER ::   jjproc
1060      INTEGER ::   ii, ji
1061      !!----------------------------------------------------------------------
1062      !
1063      njmppmax = MAXVAL( njmppt )
1064      !
1065      ! Look for how many procs on the northern boundary
1066      ndim_rank_north = 0
1067      DO jjproc = 1, jpnij
1068         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1069      END DO
1070      !
1071      ! Allocate the right size to nrank_north
1072      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1073      ALLOCATE( nrank_north(ndim_rank_north) )
1074
1075      ! Fill the nrank_north array with proc. number of northern procs.
1076      ! Note : the rank start at 0 in MPI
1077      ii = 0
1078      DO ji = 1, jpnij
1079         IF ( njmppt(ji) == njmppmax   ) THEN
1080            ii=ii+1
1081            nrank_north(ii)=ji-1
1082         END IF
1083      END DO
1084      !
1085      ! create the world group
1086      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1087      !
1088      ! Create the North group from the world group
1089      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1090      !
1091      ! Create the North communicator , ie the pool of procs in the north group
1092      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1093      !
1094   END SUBROUTINE mpp_ini_north
1095
1096
1097   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1098      !!---------------------------------------------------------------------
1099      !!                   ***  routine mpp_init.opa  ***
1100      !!
1101      !! ** Purpose :: export and attach a MPI buffer for bsend
1102      !!
1103      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1104      !!            but classical mpi_init
1105      !!
1106      !! History :: 01/11 :: IDRIS initial version for IBM only
1107      !!            08/04 :: R. Benshila, generalisation
1108      !!---------------------------------------------------------------------
1109      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1110      INTEGER                      , INTENT(inout) ::   ksft
1111      INTEGER                      , INTENT(  out) ::   code
1112      INTEGER                                      ::   ierr, ji
1113      LOGICAL                                      ::   mpi_was_called
1114      !!---------------------------------------------------------------------
1115      !
1116      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1117      IF ( code /= MPI_SUCCESS ) THEN
1118         DO ji = 1, SIZE(ldtxt)
1119            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1120         END DO
1121         WRITE(*, cform_err)
1122         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1123         CALL mpi_abort( mpi_comm_world, code, ierr )
1124      ENDIF
1125      !
1126      IF( .NOT. mpi_was_called ) THEN
1127         CALL mpi_init( code )
1128         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1129         IF ( code /= MPI_SUCCESS ) THEN
1130            DO ji = 1, SIZE(ldtxt)
1131               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1132            END DO
1133            WRITE(*, cform_err)
1134            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1135            CALL mpi_abort( mpi_comm_world, code, ierr )
1136         ENDIF
1137      ENDIF
1138      !
1139      IF( nn_buffer > 0 ) THEN
1140         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1141         ! Buffer allocation and attachment
1142         ALLOCATE( tampon(nn_buffer), stat = ierr )
1143         IF( ierr /= 0 ) THEN
1144            DO ji = 1, SIZE(ldtxt)
1145               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1146            END DO
1147            WRITE(*, cform_err)
1148            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1149            CALL mpi_abort( mpi_comm_world, code, ierr )
1150         END IF
1151         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1152      ENDIF
1153      !
1154   END SUBROUTINE mpi_init_oce
1155
1156
1157   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1158      !!---------------------------------------------------------------------
1159      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1160      !!
1161      !!   Modification of original codes written by David H. Bailey
1162      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1163      !!---------------------------------------------------------------------
1164      INTEGER                     , INTENT(in)    ::   ilen, itype
1165      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1166      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1167      !
1168      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1169      INTEGER  :: ji, ztmp           ! local scalar
1170      !!---------------------------------------------------------------------
1171      !
1172      ztmp = itype   ! avoid compilation warning
1173      !
1174      DO ji=1,ilen
1175      ! Compute ydda + yddb using Knuth's trick.
1176         zt1  = real(ydda(ji)) + real(yddb(ji))
1177         zerr = zt1 - real(ydda(ji))
1178         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1179                + aimag(ydda(ji)) + aimag(yddb(ji))
1180
1181         ! The result is zt1 + zt2, after normalization.
1182         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1183      END DO
1184      !
1185   END SUBROUTINE DDPDD_MPI
1186
1187
1188   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1189      !!---------------------------------------------------------------------
1190      !!                   ***  routine mpp_lbc_north_icb  ***
1191      !!
1192      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1193      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1194      !!              array with outer extra halo
1195      !!
1196      !! ** Method  :   North fold condition and mpp with more than one proc
1197      !!              in i-direction require a specific treatment. We gather
1198      !!              the 4+kextj northern lines of the global domain on 1
1199      !!              processor and apply lbc north-fold on this sub array.
1200      !!              Then we scatter the north fold array back to the processors.
1201      !!              This routine accounts for an extra halo with icebergs
1202      !!              and assumes ghost rows and columns have been suppressed.
1203      !!
1204      !!----------------------------------------------------------------------
1205      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1206      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1207      !                                                     !   = T ,  U , V , F or W -points
1208      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1209      !!                                                    ! north fold, =  1. otherwise
1210      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1211      !
1212      INTEGER ::   ji, jj, jr
1213      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1214      INTEGER ::   ipj, ij, iproc
1215      !
1216      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1217      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1218      !!----------------------------------------------------------------------
1219      !
1220      ipj=4
1221      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1222     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1223     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1224      !
1225      ztab_e(:,:)      = 0._wp
1226      znorthloc_e(:,:) = 0._wp
1227      !
1228      ij = 1 - kextj
1229      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1230      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1231         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1232         ij = ij + 1
1233      END DO
1234      !
1235      itaille = jpimax * ( ipj + 2*kextj )
1236      !
1237      IF( ln_timing ) CALL tic_tac(.TRUE.)
1238      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1239         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1240         &                ncomm_north, ierr )
1241      !
1242      IF( ln_timing ) CALL tic_tac(.FALSE.)
1243      !
1244      DO jr = 1, ndim_rank_north            ! recover the global north array
1245         iproc = nrank_north(jr) + 1
1246         ildi = nldit (iproc)
1247         ilei = nleit (iproc)
1248         iilb = nimppt(iproc)
1249         DO jj = 1-kextj, ipj+kextj
1250            DO ji = ildi, ilei
1251               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1252            END DO
1253         END DO
1254      END DO
1255
1256      ! 2. North-Fold boundary conditions
1257      ! ----------------------------------
1258      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1259
1260      ij = 1 - kextj
1261      !! Scatter back to pt2d
1262      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1263         DO ji= 1, jpi
1264            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1265         END DO
1266         ij  = ij +1
1267      END DO
1268      !
1269      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1270      !
1271   END SUBROUTINE mpp_lbc_north_icb
1272
1273
1274   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1275      !!----------------------------------------------------------------------
1276      !!                  ***  routine mpp_lnk_2d_icb  ***
1277      !!
1278      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1279      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1280      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1281      !!
1282      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1283      !!      between processors following neighboring subdomains.
1284      !!            domain parameters
1285      !!                    jpi    : first dimension of the local subdomain
1286      !!                    jpj    : second dimension of the local subdomain
1287      !!                    kexti  : number of columns for extra outer halo
1288      !!                    kextj  : number of rows for extra outer halo
1289      !!                    nbondi : mark for "east-west local boundary"
1290      !!                    nbondj : mark for "north-south local boundary"
1291      !!                    noea   : number for local neighboring processors
1292      !!                    nowe   : number for local neighboring processors
1293      !!                    noso   : number for local neighboring processors
1294      !!                    nono   : number for local neighboring processors
1295      !!----------------------------------------------------------------------
1296      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1297      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1298      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1299      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1300      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1301      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1302      !
1303      INTEGER  ::   jl   ! dummy loop indices
1304      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1305      INTEGER  ::   ipreci, iprecj             !   -       -
1306      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1307      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1308      !!
1309      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1310      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1311      !!----------------------------------------------------------------------
1312
1313      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1314      iprecj = nn_hls + kextj
1315
1316      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1317
1318      ! 1. standard boundary treatment
1319      ! ------------------------------
1320      ! Order matters Here !!!!
1321      !
1322      !                                      ! East-West boundaries
1323      !                                           !* Cyclic east-west
1324      IF( l_Iperio ) THEN
1325         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1326         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1327         !
1328      ELSE                                        !* closed
1329         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1330                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1331      ENDIF
1332      !                                      ! North-South boundaries
1333      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1334         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1335         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1336      ELSE                                        !* closed
1337         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1338                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1339      ENDIF
1340      !
1341
1342      ! north fold treatment
1343      ! -----------------------
1344      IF( npolj /= 0 ) THEN
1345         !
1346         SELECT CASE ( jpni )
1347                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1348                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1349         END SELECT
1350         !
1351      ENDIF
1352
1353      ! 2. East and west directions exchange
1354      ! ------------------------------------
1355      ! we play with the neigbours AND the row number because of the periodicity
1356      !
1357      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1358      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1359         iihom = jpi-nreci-kexti
1360         DO jl = 1, ipreci
1361            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1362            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1363         END DO
1364      END SELECT
1365      !
1366      !                           ! Migrations
1367      imigr = ipreci * ( jpj + 2*kextj )
1368      !
1369      IF( ln_timing ) CALL tic_tac(.TRUE.)
1370      !
1371      SELECT CASE ( nbondi )
1372      CASE ( -1 )
1373         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1374         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1375         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1376      CASE ( 0 )
1377         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1378         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1379         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1380         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1381         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1382         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1383      CASE ( 1 )
1384         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1385         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1386         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1387      END SELECT
1388      !
1389      IF( ln_timing ) CALL tic_tac(.FALSE.)
1390      !
1391      !                           ! Write Dirichlet lateral conditions
1392      iihom = jpi - nn_hls
1393      !
1394      SELECT CASE ( nbondi )
1395      CASE ( -1 )
1396         DO jl = 1, ipreci
1397            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1398         END DO
1399      CASE ( 0 )
1400         DO jl = 1, ipreci
1401            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1402            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1403         END DO
1404      CASE ( 1 )
1405         DO jl = 1, ipreci
1406            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1407         END DO
1408      END SELECT
1409
1410
1411      ! 3. North and south directions
1412      ! -----------------------------
1413      ! always closed : we play only with the neigbours
1414      !
1415      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1416         ijhom = jpj-nrecj-kextj
1417         DO jl = 1, iprecj
1418            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1419            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1420         END DO
1421      ENDIF
1422      !
1423      !                           ! Migrations
1424      imigr = iprecj * ( jpi + 2*kexti )
1425      !
1426      IF( ln_timing ) CALL tic_tac(.TRUE.)
1427      !
1428      SELECT CASE ( nbondj )
1429      CASE ( -1 )
1430         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1431         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1432         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1433      CASE ( 0 )
1434         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1435         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1436         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1437         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1438         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1439         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1440      CASE ( 1 )
1441         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1442         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1443         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1444      END SELECT
1445      !
1446      IF( ln_timing ) CALL tic_tac(.FALSE.)
1447      !
1448      !                           ! Write Dirichlet lateral conditions
1449      ijhom = jpj - nn_hls
1450      !
1451      SELECT CASE ( nbondj )
1452      CASE ( -1 )
1453         DO jl = 1, iprecj
1454            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1455         END DO
1456      CASE ( 0 )
1457         DO jl = 1, iprecj
1458            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1459            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1460         END DO
1461      CASE ( 1 )
1462         DO jl = 1, iprecj
1463            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1464         END DO
1465      END SELECT
1466      !
1467   END SUBROUTINE mpp_lnk_2d_icb
1468
1469
1470   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1471      !!----------------------------------------------------------------------
1472      !!                  ***  routine mpp_report  ***
1473      !!
1474      !! ** Purpose :   report use of mpp routines per time-setp
1475      !!
1476      !!----------------------------------------------------------------------
1477      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1478      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1479      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1480      !!
1481      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1482      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1483      !!----------------------------------------------------------------------
1484      !
1485      ll_lbc = .FALSE.
1486      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1487      ll_glb = .FALSE.
1488      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1489      ll_dlg = .FALSE.
1490      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1491      !
1492      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1493      ncom_freq = ncom_fsbc
1494      !
1495      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1496         IF( ll_lbc ) THEN
1497            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1498            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1499            n_sequence_lbc = n_sequence_lbc + 1
1500            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1501            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1502            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1503            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1504         ENDIF
1505         IF( ll_glb ) THEN
1506            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1507            n_sequence_glb = n_sequence_glb + 1
1508            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1509            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1510         ENDIF
1511         IF( ll_dlg ) THEN
1512            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1513            n_sequence_dlg = n_sequence_dlg + 1
1514            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1515            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1516         ENDIF
1517      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1518         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1519         WRITE(numcom,*) ' '
1520         WRITE(numcom,*) ' ------------------------------------------------------------'
1521         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1522         WRITE(numcom,*) ' ------------------------------------------------------------'
1523         WRITE(numcom,*) ' '
1524         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1525         jj = 0; jk = 0; jf = 0; jh = 0
1526         DO ji = 1, n_sequence_lbc
1527            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1528            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1529            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1530            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1531         END DO
1532         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1533         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1534         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1535         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1536         WRITE(numcom,*) ' '
1537         WRITE(numcom,*) ' lbc_lnk called'
1538         jj = 1
1539         DO ji = 2, n_sequence_lbc
1540            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1541               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1542               jj = 0
1543            END IF
1544            jj = jj + 1 
1545         END DO
1546         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1547         WRITE(numcom,*) ' '
1548         IF ( n_sequence_glb > 0 ) THEN
1549            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1550            jj = 1
1551            DO ji = 2, n_sequence_glb
1552               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1553                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1554                  jj = 0
1555               END IF
1556               jj = jj + 1 
1557            END DO
1558            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1559            DEALLOCATE(crname_glb)
1560         ELSE
1561            WRITE(numcom,*) ' No MPI global communication '
1562         ENDIF
1563         WRITE(numcom,*) ' '
1564         IF ( n_sequence_dlg > 0 ) THEN
1565            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1566            jj = 1
1567            DO ji = 2, n_sequence_dlg
1568               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1569                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1570                  jj = 0
1571               END IF
1572               jj = jj + 1 
1573            END DO
1574            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1575            DEALLOCATE(crname_dlg)
1576         ELSE
1577            WRITE(numcom,*) ' No MPI delayed global communication '
1578         ENDIF
1579         WRITE(numcom,*) ' '
1580         WRITE(numcom,*) ' -----------------------------------------------'
1581         WRITE(numcom,*) ' '
1582         DEALLOCATE(ncomm_sequence)
1583         DEALLOCATE(crname_lbc)
1584      ENDIF
1585   END SUBROUTINE mpp_report
1586
1587   
1588   SUBROUTINE tic_tac (ld_tic, ld_global)
1589
1590    LOGICAL,           INTENT(IN) :: ld_tic
1591    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1592    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1593    REAL(wp),               SAVE :: tic_ct = 0._wp
1594    INTEGER :: ii
1595
1596    IF( ncom_stp <= nit000 ) RETURN
1597    IF( ncom_stp == nitend ) RETURN
1598    ii = 1
1599    IF( PRESENT( ld_global ) ) THEN
1600       IF( ld_global ) ii = 2
1601    END IF
1602   
1603    IF ( ld_tic ) THEN
1604       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1605       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1606    ELSE
1607       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1608       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1609    ENDIF
1610   
1611   END SUBROUTINE tic_tac
1612
1613   
1614#else
1615   !!----------------------------------------------------------------------
1616   !!   Default case:            Dummy module        share memory computing
1617   !!----------------------------------------------------------------------
1618   USE in_out_manager
1619
1620   INTERFACE mpp_sum
1621      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1622   END INTERFACE
1623   INTERFACE mpp_max
1624      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1625   END INTERFACE
1626   INTERFACE mpp_min
1627      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1628   END INTERFACE
1629   INTERFACE mpp_minloc
1630      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1631   END INTERFACE
1632   INTERFACE mpp_maxloc
1633      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1634   END INTERFACE
1635
1636   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1637   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1638   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1639
1640   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1641   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1642   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1643   LOGICAL, PUBLIC                          ::   l_full_nf_update = .TRUE.
1644   TYPE ::   DELAYARR
1645      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1646      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1647   END TYPE DELAYARR
1648   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1649   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1650   !!----------------------------------------------------------------------
1651CONTAINS
1652
1653   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1654      INTEGER, INTENT(in) ::   kumout
1655      lib_mpp_alloc = 0
1656   END FUNCTION lib_mpp_alloc
1657
1658   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1659      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1660      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1661      CHARACTER(len=*) ::   ldname
1662      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1663      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1664      function_value = 0
1665      IF( .FALSE. )   ldtxt(:) = 'never done'
1666      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1667   END FUNCTION mynode
1668
1669   SUBROUTINE mppsync                       ! Dummy routine
1670   END SUBROUTINE mppsync
1671
1672   !!----------------------------------------------------------------------
1673   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1674   !!   
1675   !!----------------------------------------------------------------------
1676   !!
1677#  define OPERATION_MAX
1678#  define INTEGER_TYPE
1679#  define DIM_0d
1680#     define ROUTINE_ALLREDUCE           mppmax_int
1681#     include "mpp_allreduce_generic.h90"
1682#     undef ROUTINE_ALLREDUCE
1683#  undef DIM_0d
1684#  define DIM_1d
1685#     define ROUTINE_ALLREDUCE           mppmax_a_int
1686#     include "mpp_allreduce_generic.h90"
1687#     undef ROUTINE_ALLREDUCE
1688#  undef DIM_1d
1689#  undef INTEGER_TYPE
1690!
1691#  define REAL_TYPE
1692#  define DIM_0d
1693#     define ROUTINE_ALLREDUCE           mppmax_real
1694#     include "mpp_allreduce_generic.h90"
1695#     undef ROUTINE_ALLREDUCE
1696#  undef DIM_0d
1697#  define DIM_1d
1698#     define ROUTINE_ALLREDUCE           mppmax_a_real
1699#     include "mpp_allreduce_generic.h90"
1700#     undef ROUTINE_ALLREDUCE
1701#  undef DIM_1d
1702#  undef REAL_TYPE
1703#  undef OPERATION_MAX
1704   !!----------------------------------------------------------------------
1705   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1706   !!   
1707   !!----------------------------------------------------------------------
1708   !!
1709#  define OPERATION_MIN
1710#  define INTEGER_TYPE
1711#  define DIM_0d
1712#     define ROUTINE_ALLREDUCE           mppmin_int
1713#     include "mpp_allreduce_generic.h90"
1714#     undef ROUTINE_ALLREDUCE
1715#  undef DIM_0d
1716#  define DIM_1d
1717#     define ROUTINE_ALLREDUCE           mppmin_a_int
1718#     include "mpp_allreduce_generic.h90"
1719#     undef ROUTINE_ALLREDUCE
1720#  undef DIM_1d
1721#  undef INTEGER_TYPE
1722!
1723#  define REAL_TYPE
1724#  define DIM_0d
1725#     define ROUTINE_ALLREDUCE           mppmin_real
1726#     include "mpp_allreduce_generic.h90"
1727#     undef ROUTINE_ALLREDUCE
1728#  undef DIM_0d
1729#  define DIM_1d
1730#     define ROUTINE_ALLREDUCE           mppmin_a_real
1731#     include "mpp_allreduce_generic.h90"
1732#     undef ROUTINE_ALLREDUCE
1733#  undef DIM_1d
1734#  undef REAL_TYPE
1735#  undef OPERATION_MIN
1736
1737   !!----------------------------------------------------------------------
1738   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1739   !!   
1740   !!   Global sum of 1D array or a variable (integer, real or complex)
1741   !!----------------------------------------------------------------------
1742   !!
1743#  define OPERATION_SUM
1744#  define INTEGER_TYPE
1745#  define DIM_0d
1746#     define ROUTINE_ALLREDUCE           mppsum_int
1747#     include "mpp_allreduce_generic.h90"
1748#     undef ROUTINE_ALLREDUCE
1749#  undef DIM_0d
1750#  define DIM_1d
1751#     define ROUTINE_ALLREDUCE           mppsum_a_int
1752#     include "mpp_allreduce_generic.h90"
1753#     undef ROUTINE_ALLREDUCE
1754#  undef DIM_1d
1755#  undef INTEGER_TYPE
1756!
1757#  define REAL_TYPE
1758#  define DIM_0d
1759#     define ROUTINE_ALLREDUCE           mppsum_real
1760#     include "mpp_allreduce_generic.h90"
1761#     undef ROUTINE_ALLREDUCE
1762#  undef DIM_0d
1763#  define DIM_1d
1764#     define ROUTINE_ALLREDUCE           mppsum_a_real
1765#     include "mpp_allreduce_generic.h90"
1766#     undef ROUTINE_ALLREDUCE
1767#  undef DIM_1d
1768#  undef REAL_TYPE
1769#  undef OPERATION_SUM
1770
1771#  define OPERATION_SUM_DD
1772#  define COMPLEX_TYPE
1773#  define DIM_0d
1774#     define ROUTINE_ALLREDUCE           mppsum_realdd
1775#     include "mpp_allreduce_generic.h90"
1776#     undef ROUTINE_ALLREDUCE
1777#  undef DIM_0d
1778#  define DIM_1d
1779#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1780#     include "mpp_allreduce_generic.h90"
1781#     undef ROUTINE_ALLREDUCE
1782#  undef DIM_1d
1783#  undef COMPLEX_TYPE
1784#  undef OPERATION_SUM_DD
1785
1786   !!----------------------------------------------------------------------
1787   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1788   !!   
1789   !!----------------------------------------------------------------------
1790   !!
1791#  define OPERATION_MINLOC
1792#  define DIM_2d
1793#     define ROUTINE_LOC           mpp_minloc2d
1794#     include "mpp_loc_generic.h90"
1795#     undef ROUTINE_LOC
1796#  undef DIM_2d
1797#  define DIM_3d
1798#     define ROUTINE_LOC           mpp_minloc3d
1799#     include "mpp_loc_generic.h90"
1800#     undef ROUTINE_LOC
1801#  undef DIM_3d
1802#  undef OPERATION_MINLOC
1803
1804#  define OPERATION_MAXLOC
1805#  define DIM_2d
1806#     define ROUTINE_LOC           mpp_maxloc2d
1807#     include "mpp_loc_generic.h90"
1808#     undef ROUTINE_LOC
1809#  undef DIM_2d
1810#  define DIM_3d
1811#     define ROUTINE_LOC           mpp_maxloc3d
1812#     include "mpp_loc_generic.h90"
1813#     undef ROUTINE_LOC
1814#  undef DIM_3d
1815#  undef OPERATION_MAXLOC
1816
1817   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1818      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1819      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1820      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1821      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1822      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1823      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1824      !
1825      pout(:) = REAL(y_in(:), wp)
1826   END SUBROUTINE mpp_delay_sum
1827
1828   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1829      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1830      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1831      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1832      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1833      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1834      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1835      !
1836      pout(:) = p_in(:)
1837   END SUBROUTINE mpp_delay_max
1838
1839   SUBROUTINE mpp_delay_rcv( kid )
1840      INTEGER,INTENT(in   )      ::  kid 
1841      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1842   END SUBROUTINE mpp_delay_rcv
1843   
1844   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1845      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1846      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1847      STOP      ! non MPP case, just stop the run
1848   END SUBROUTINE mppstop
1849
1850   SUBROUTINE mpp_ini_znl( knum )
1851      INTEGER :: knum
1852      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1853   END SUBROUTINE mpp_ini_znl
1854
1855   SUBROUTINE mpp_comm_free( kcom )
1856      INTEGER :: kcom
1857      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1858   END SUBROUTINE mpp_comm_free
1859   
1860#endif
1861
1862   !!----------------------------------------------------------------------
1863   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1864   !!----------------------------------------------------------------------
1865
1866   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1867      &                 cd6, cd7, cd8, cd9, cd10 )
1868      !!----------------------------------------------------------------------
1869      !!                  ***  ROUTINE  stop_opa  ***
1870      !!
1871      !! ** Purpose :   print in ocean.outpput file a error message and
1872      !!                increment the error number (nstop) by one.
1873      !!----------------------------------------------------------------------
1874      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1875      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1876      !!----------------------------------------------------------------------
1877      !
1878      nstop = nstop + 1
1879
1880      ! force to open ocean.output file
1881      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1882       
1883      WRITE(numout,cform_err)
1884      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1885      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1886      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1887      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1888      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1889      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1890      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1891      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1892      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1893      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1894
1895                               CALL FLUSH(numout    )
1896      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1897      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1898      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1899      !
1900      IF( cd1 == 'STOP' ) THEN
1901         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1902         CALL mppstop(ld_force_abort = .true.)
1903      ENDIF
1904      !
1905   END SUBROUTINE ctl_stop
1906
1907
1908   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1909      &                 cd6, cd7, cd8, cd9, cd10 )
1910      !!----------------------------------------------------------------------
1911      !!                  ***  ROUTINE  stop_warn  ***
1912      !!
1913      !! ** Purpose :   print in ocean.outpput file a error message and
1914      !!                increment the warning number (nwarn) by one.
1915      !!----------------------------------------------------------------------
1916      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1917      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1918      !!----------------------------------------------------------------------
1919      !
1920      nwarn = nwarn + 1
1921      IF(lwp) THEN
1922         WRITE(numout,cform_war)
1923         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1924         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1925         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1926         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1927         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1928         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1929         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1930         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1931         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1932         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1933      ENDIF
1934      CALL FLUSH(numout)
1935      !
1936   END SUBROUTINE ctl_warn
1937
1938
1939   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1940      !!----------------------------------------------------------------------
1941      !!                  ***  ROUTINE ctl_opn  ***
1942      !!
1943      !! ** Purpose :   Open file and check if required file is available.
1944      !!
1945      !! ** Method  :   Fortan open
1946      !!----------------------------------------------------------------------
1947      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1948      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1949      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1950      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1951      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1952      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1953      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1954      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1955      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1956      !
1957      CHARACTER(len=80) ::   clfile
1958      INTEGER           ::   iost
1959      !!----------------------------------------------------------------------
1960      !
1961      ! adapt filename
1962      ! ----------------
1963      clfile = TRIM(cdfile)
1964      IF( PRESENT( karea ) ) THEN
1965         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1966      ENDIF
1967#if defined key_agrif
1968      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1969      knum=Agrif_Get_Unit()
1970#else
1971      knum=get_unit()
1972#endif
1973      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1974      !
1975      iost=0
1976      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1977         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1978      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1979         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1980      ELSE
1981         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1982      ENDIF
1983      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1984         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1985      IF( iost == 0 ) THEN
1986         IF(ldwp) THEN
1987            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1988            WRITE(kout,*) '     unit   = ', knum
1989            WRITE(kout,*) '     status = ', cdstat
1990            WRITE(kout,*) '     form   = ', cdform
1991            WRITE(kout,*) '     access = ', cdacce
1992            WRITE(kout,*)
1993         ENDIF
1994      ENDIF
1995100   CONTINUE
1996      IF( iost /= 0 ) THEN
1997         IF(ldwp) THEN
1998            WRITE(kout,*)
1999            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
2000            WRITE(kout,*) ' =======   ===  '
2001            WRITE(kout,*) '           unit   = ', knum
2002            WRITE(kout,*) '           status = ', cdstat
2003            WRITE(kout,*) '           form   = ', cdform
2004            WRITE(kout,*) '           access = ', cdacce
2005            WRITE(kout,*) '           iostat = ', iost
2006            WRITE(kout,*) '           we stop. verify the file '
2007            WRITE(kout,*)
2008         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
2009            WRITE(*,*)
2010            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
2011            WRITE(*,*) ' =======   ===  '
2012            WRITE(*,*) '           unit   = ', knum
2013            WRITE(*,*) '           status = ', cdstat
2014            WRITE(*,*) '           form   = ', cdform
2015            WRITE(*,*) '           access = ', cdacce
2016            WRITE(*,*) '           iostat = ', iost
2017            WRITE(*,*) '           we stop. verify the file '
2018            WRITE(*,*)
2019         ENDIF
2020         CALL FLUSH( kout ) 
2021         STOP 'ctl_opn bad opening'
2022      ENDIF
2023      !
2024   END SUBROUTINE ctl_opn
2025
2026
2027   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
2028      !!----------------------------------------------------------------------
2029      !!                  ***  ROUTINE ctl_nam  ***
2030      !!
2031      !! ** Purpose :   Informations when error while reading a namelist
2032      !!
2033      !! ** Method  :   Fortan open
2034      !!----------------------------------------------------------------------
2035      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
2036      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
2037      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
2038      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
2039      !!----------------------------------------------------------------------
2040      !
2041      WRITE (clios, '(I5.0)')   kios
2042      IF( kios < 0 ) THEN         
2043         CALL ctl_warn( 'end of record or file while reading namelist '   &
2044            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2045      ENDIF
2046      !
2047      IF( kios > 0 ) THEN
2048         CALL ctl_stop( 'misspelled variable in namelist '   &
2049            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2050      ENDIF
2051      kios = 0
2052      RETURN
2053      !
2054   END SUBROUTINE ctl_nam
2055
2056
2057   INTEGER FUNCTION get_unit()
2058      !!----------------------------------------------------------------------
2059      !!                  ***  FUNCTION  get_unit  ***
2060      !!
2061      !! ** Purpose :   return the index of an unused logical unit
2062      !!----------------------------------------------------------------------
2063      LOGICAL :: llopn
2064      !!----------------------------------------------------------------------
2065      !
2066      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2067      llopn = .TRUE.
2068      DO WHILE( (get_unit < 998) .AND. llopn )
2069         get_unit = get_unit + 1
2070         INQUIRE( unit = get_unit, opened = llopn )
2071      END DO
2072      IF( (get_unit == 999) .AND. llopn ) THEN
2073         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2074         get_unit = -1
2075      ENDIF
2076      !
2077   END FUNCTION get_unit
2078
2079   !!----------------------------------------------------------------------
2080END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.