New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2019/dev_r10984_HPC-13_IRRMANN_BDY_optimization/src/OCE/LBC – NEMO

source: NEMO/branches/2019/dev_r10984_HPC-13_IRRMANN_BDY_optimization/src/OCE/LBC/lib_mpp.F90 @ 11067

Last change on this file since 11067 was 11067, checked in by girrmann, 5 years ago

dev_r10984_HPC-13 : new implementation of lbc_bdy_lnk in prevision of step 2, regroup communications, see #2285

  • Property svn:keywords set to Id
File size: 91.4 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d        , mpp_lnk_3d        , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr    , mpp_lnk_3d_ptr    , mpp_lnk_4d_ptr
73   PUBLIC   mpp_lnk_bdy_2d    , mpp_lnk_bdy_3d    , mpp_lnk_bdy_4d
74   PUBLIC   mpp_lnk_bdy_2d_ptr, mpp_lnk_bdy_3d_ptr, mpp_lnk_bdy_4d_ptr
75   !
76!!gm  this should be useless
77   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
78   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
79!!gm end
80   !
81   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
82   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
83   PUBLIC   mpp_ini_north
84   PUBLIC   mpp_lnk_2d_icb
85   PUBLIC   mpp_lbc_north_icb
86   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
87   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
88   PUBLIC   mppscatter, mppgather
89   PUBLIC   mpp_ini_znl
90   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
91   
92   !! * Interfaces
93   !! define generic interface for these routine as they are called sometimes
94   !! with scalar arguments instead of array arguments, which causes problems
95   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
96   INTERFACE mpp_min
97      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
98   END INTERFACE
99   INTERFACE mpp_max
100      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
101   END INTERFACE
102   INTERFACE mpp_sum
103      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
104         &             mppsum_realdd, mppsum_a_realdd
105   END INTERFACE
106   INTERFACE mpp_minloc
107      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
108   END INTERFACE
109   INTERFACE mpp_maxloc
110      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
111   END INTERFACE
112
113   !! ========================= !!
114   !!  MPI  variable definition !!
115   !! ========================= !!
116!$AGRIF_DO_NOT_TREAT
117   INCLUDE 'mpif.h'
118!$AGRIF_END_DO_NOT_TREAT
119
120   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
121
122   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
123
124   INTEGER, PUBLIC ::   mppsize        ! number of process
125   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
126!$AGRIF_DO_NOT_TREAT
127   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
128!$AGRIF_END_DO_NOT_TREAT
129
130   INTEGER :: MPI_SUMDD
131
132   ! variables used for zonal integration
133   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
134   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
135   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
136   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
137   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
138
139   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
140   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
141   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
142   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
143   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
144   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
145   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
146   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
147   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
148
149   ! Type of send : standard, buffered, immediate
150   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
151   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
152   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
153
154   ! Communications summary report
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
156   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
157   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
158   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
159   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
160   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
161   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
162   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
163   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
164   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
165   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
166   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
167   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
168   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
169   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
170   !: name (used as id) of allreduce-delayed operations
171   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
172   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
173   !: component name where the allreduce-delayed operation is performed
174   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
175   TYPE, PUBLIC ::   DELAYARR
176      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
177      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
178   END TYPE DELAYARR
179   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
180   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
181
182   ! timing summary report
183   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
184   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
185   
186   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
187
188   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
189   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
190
191   !!----------------------------------------------------------------------
192   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
193   !! $Id$
194   !! Software governed by the CeCILL license (see ./LICENSE)
195   !!----------------------------------------------------------------------
196CONTAINS
197
198   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
199      !!----------------------------------------------------------------------
200      !!                  ***  routine mynode  ***
201      !!
202      !! ** Purpose :   Find processor unit
203      !!----------------------------------------------------------------------
204      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
205      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
206      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
207      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
208      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
209      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
210      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
211      !
212      INTEGER ::   mynode, ierr, code, ji, ii, ios
213      LOGICAL ::   mpi_was_called
214      !
215      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
216      !!----------------------------------------------------------------------
217      !
218      ii = 1
219      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
220      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
221      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
222      !
223      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
224      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
225901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
226      !
227      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
228      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
229902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
230      !
231      !                              ! control print
232      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
233      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
234      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
235      !
236      IF( jpni < 1 .OR. jpnj < 1  ) THEN
237         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
238      ELSE
239         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
240         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
241      ENDIF
242
243      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
244
245      CALL mpi_initialized ( mpi_was_called, code )
246      IF( code /= MPI_SUCCESS ) THEN
247         DO ji = 1, SIZE(ldtxt)
248            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
249         END DO
250         WRITE(*, cform_err)
251         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
252         CALL mpi_abort( mpi_comm_world, code, ierr )
253      ENDIF
254
255      IF( mpi_was_called ) THEN
256         !
257         SELECT CASE ( cn_mpi_send )
258         CASE ( 'S' )                ! Standard mpi send (blocking)
259            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
260         CASE ( 'B' )                ! Buffer mpi send (blocking)
261            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
262            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
263         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
264            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
265            l_isend = .TRUE.
266         CASE DEFAULT
267            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
268            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
269            kstop = kstop + 1
270         END SELECT
271         !
272      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
273         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
274         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
275         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
276         kstop = kstop + 1
277      ELSE
278         SELECT CASE ( cn_mpi_send )
279         CASE ( 'S' )                ! Standard mpi send (blocking)
280            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
281            CALL mpi_init( ierr )
282         CASE ( 'B' )                ! Buffer mpi send (blocking)
283            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
284            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
285         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
286            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
287            l_isend = .TRUE.
288            CALL mpi_init( ierr )
289         CASE DEFAULT
290            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
291            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
292            kstop = kstop + 1
293         END SELECT
294         !
295      ENDIF
296
297      IF( PRESENT(localComm) ) THEN
298         IF( Agrif_Root() ) THEN
299            mpi_comm_oce = localComm
300         ENDIF
301      ELSE
302         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
303         IF( code /= MPI_SUCCESS ) THEN
304            DO ji = 1, SIZE(ldtxt)
305               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
306            END DO
307            WRITE(*, cform_err)
308            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
309            CALL mpi_abort( mpi_comm_world, code, ierr )
310         ENDIF
311      ENDIF
312
313#if defined key_agrif
314      IF( Agrif_Root() ) THEN
315         CALL Agrif_MPI_Init(mpi_comm_oce)
316      ELSE
317         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
318      ENDIF
319#endif
320
321      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
322      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
323      mynode = mpprank
324
325      IF( mynode == 0 ) THEN
326         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
327         WRITE(kumond, nammpp)     
328      ENDIF
329      !
330      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
331      !
332   END FUNCTION mynode
333
334   !!----------------------------------------------------------------------
335   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
336   !!
337   !!   * Argument : dummy argument use in mpp_lnk_... routines
338   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
339   !!                cd_nat :   nature of array grid-points
340   !!                psgn   :   sign used across the north fold boundary
341   !!                kfld   :   optional, number of pt3d arrays
342   !!                cd_mpp :   optional, fill the overlap area only
343   !!                pval   :   optional, background value (used at closed boundaries)
344   !!----------------------------------------------------------------------
345   !
346   !                       !==  2D array and array of 2D pointer  ==!
347   !
348#  define DIM_2d
349#     define ROUTINE_LNK           mpp_lnk_2d
350#     include "mpp_lnk_generic.h90"
351#     undef ROUTINE_LNK
352#     define MULTI
353#     define ROUTINE_LNK           mpp_lnk_2d_ptr
354#     include "mpp_lnk_generic.h90"
355#     undef ROUTINE_LNK
356#     undef MULTI
357#  undef DIM_2d
358   !
359   !                       !==  3D array and array of 3D pointer  ==!
360   !
361#  define DIM_3d
362#     define ROUTINE_LNK           mpp_lnk_3d
363#     include "mpp_lnk_generic.h90"
364#     undef ROUTINE_LNK
365#     define MULTI
366#     define ROUTINE_LNK           mpp_lnk_3d_ptr
367#     include "mpp_lnk_generic.h90"
368#     undef ROUTINE_LNK
369#     undef MULTI
370#  undef DIM_3d
371   !
372   !                       !==  4D array and array of 4D pointer  ==!
373   !
374#  define DIM_4d
375#     define ROUTINE_LNK           mpp_lnk_4d
376#     include "mpp_lnk_generic.h90"
377#     undef ROUTINE_LNK
378#     define MULTI
379#     define ROUTINE_LNK           mpp_lnk_4d_ptr
380#     include "mpp_lnk_generic.h90"
381#     undef ROUTINE_LNK
382#     undef MULTI
383#  undef DIM_4d
384
385   !!----------------------------------------------------------------------
386   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
387   !!
388   !!   * Argument : dummy argument use in mpp_nfd_... routines
389   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
390   !!                cd_nat :   nature of array grid-points
391   !!                psgn   :   sign used across the north fold boundary
392   !!                kfld   :   optional, number of pt3d arrays
393   !!                cd_mpp :   optional, fill the overlap area only
394   !!                pval   :   optional, background value (used at closed boundaries)
395   !!----------------------------------------------------------------------
396   !
397   !                       !==  2D array and array of 2D pointer  ==!
398   !
399#  define DIM_2d
400#     define ROUTINE_NFD           mpp_nfd_2d
401#     include "mpp_nfd_generic.h90"
402#     undef ROUTINE_NFD
403#     define MULTI
404#     define ROUTINE_NFD           mpp_nfd_2d_ptr
405#     include "mpp_nfd_generic.h90"
406#     undef ROUTINE_NFD
407#     undef MULTI
408#  undef DIM_2d
409   !
410   !                       !==  3D array and array of 3D pointer  ==!
411   !
412#  define DIM_3d
413#     define ROUTINE_NFD           mpp_nfd_3d
414#     include "mpp_nfd_generic.h90"
415#     undef ROUTINE_NFD
416#     define MULTI
417#     define ROUTINE_NFD           mpp_nfd_3d_ptr
418#     include "mpp_nfd_generic.h90"
419#     undef ROUTINE_NFD
420#     undef MULTI
421#  undef DIM_3d
422   !
423   !                       !==  4D array and array of 4D pointer  ==!
424   !
425#  define DIM_4d
426#     define ROUTINE_NFD           mpp_nfd_4d
427#     include "mpp_nfd_generic.h90"
428#     undef ROUTINE_NFD
429#     define MULTI
430#     define ROUTINE_NFD           mpp_nfd_4d_ptr
431#     include "mpp_nfd_generic.h90"
432#     undef ROUTINE_NFD
433#     undef MULTI
434#  undef DIM_4d
435
436
437   !!----------------------------------------------------------------------
438   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
439   !!
440   !!   * Argument : dummy argument use in mpp_lnk_... routines
441   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
442   !!                cd_nat :   nature of array grid-points
443   !!                psgn   :   sign used across the north fold boundary
444   !!                kb_bdy :   BDY boundary set
445   !!                kfld   :   optional, number of pt3d arrays
446   !!----------------------------------------------------------------------
447   !
448   !                       !==  2D array and array of 2D pointer  ==!
449   !
450#  define DIM_2d
451#     define ROUTINE_BDY           mpp_lnk_bdy_2d
452#     include "mpp_bdy_generic.h90"
453#     undef ROUTINE_BDY
454#     define MULTI
455#     define ROUTINE_BDY           mpp_lnk_bdy_2d_ptr
456#     include "mpp_bdy_generic.h90"
457#     undef ROUTINE_BDY
458#     undef MULTI
459#  undef DIM_2d
460   !
461   !                       !==  3D array and array of 3D pointer  ==!
462   !
463#  define DIM_3d
464#     define ROUTINE_BDY           mpp_lnk_bdy_3d
465#     include "mpp_bdy_generic.h90"
466#     undef ROUTINE_BDY
467#     define MULTI
468#     define ROUTINE_BDY           mpp_lnk_bdy_3d_ptr
469#     include "mpp_bdy_generic.h90"
470#     undef ROUTINE_BDY
471#     undef MULTI
472#  undef DIM_3d
473   !
474   !                       !==  4D array and array of 4D pointer  ==!
475   !
476#  define DIM_4d
477#     define ROUTINE_BDY           mpp_lnk_bdy_4d
478#     include "mpp_bdy_generic.h90"
479#     undef ROUTINE_BDY
480#     define MULTI
481#     define ROUTINE_BDY           mpp_lnk_bdy_4d_ptr
482#     include "mpp_bdy_generic.h90"
483#     undef ROUTINE_BDY
484#     undef MULTI
485#  undef DIM_4d
486
487   !!----------------------------------------------------------------------
488   !!
489   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
490   
491   
492   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
493   
494   
495   !!----------------------------------------------------------------------
496
497
498
499   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
500      !!----------------------------------------------------------------------
501      !!                  ***  routine mppsend  ***
502      !!
503      !! ** Purpose :   Send messag passing array
504      !!
505      !!----------------------------------------------------------------------
506      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
507      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
508      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
509      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
510      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
511      !!
512      INTEGER ::   iflag
513      !!----------------------------------------------------------------------
514      !
515      SELECT CASE ( cn_mpi_send )
516      CASE ( 'S' )                ! Standard mpi send (blocking)
517         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
518      CASE ( 'B' )                ! Buffer mpi send (blocking)
519         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
520      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
521         ! be carefull, one more argument here : the mpi request identifier..
522         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
523      END SELECT
524      !
525   END SUBROUTINE mppsend
526
527
528   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
529      !!----------------------------------------------------------------------
530      !!                  ***  routine mpprecv  ***
531      !!
532      !! ** Purpose :   Receive messag passing array
533      !!
534      !!----------------------------------------------------------------------
535      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
536      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
537      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
538      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
539      !!
540      INTEGER :: istatus(mpi_status_size)
541      INTEGER :: iflag
542      INTEGER :: use_source
543      !!----------------------------------------------------------------------
544      !
545      ! If a specific process number has been passed to the receive call,
546      ! use that one. Default is to use mpi_any_source
547      use_source = mpi_any_source
548      IF( PRESENT(ksource) )   use_source = ksource
549      !
550      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
551      !
552   END SUBROUTINE mpprecv
553
554
555   SUBROUTINE mppgather( ptab, kp, pio )
556      !!----------------------------------------------------------------------
557      !!                   ***  routine mppgather  ***
558      !!
559      !! ** Purpose :   Transfert between a local subdomain array and a work
560      !!     array which is distributed following the vertical level.
561      !!
562      !!----------------------------------------------------------------------
563      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
564      INTEGER                           , INTENT(in   ) ::   kp     ! record length
565      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
566      !!
567      INTEGER :: itaille, ierror   ! temporary integer
568      !!---------------------------------------------------------------------
569      !
570      itaille = jpi * jpj
571      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
572         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
573      !
574   END SUBROUTINE mppgather
575
576
577   SUBROUTINE mppscatter( pio, kp, ptab )
578      !!----------------------------------------------------------------------
579      !!                  ***  routine mppscatter  ***
580      !!
581      !! ** Purpose :   Transfert between awork array which is distributed
582      !!      following the vertical level and the local subdomain array.
583      !!
584      !!----------------------------------------------------------------------
585      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
586      INTEGER                             ::   kp     ! Tag (not used with MPI
587      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
588      !!
589      INTEGER :: itaille, ierror   ! temporary integer
590      !!---------------------------------------------------------------------
591      !
592      itaille = jpi * jpj
593      !
594      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
595         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
596      !
597   END SUBROUTINE mppscatter
598
599   
600   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
601     !!----------------------------------------------------------------------
602      !!                   ***  routine mpp_delay_sum  ***
603      !!
604      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
605      !!
606      !!----------------------------------------------------------------------
607      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
608      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
609      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
610      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
611      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
612      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
613      !!
614      INTEGER ::   ji, isz
615      INTEGER ::   idvar
616      INTEGER ::   ierr, ilocalcomm
617      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
618      !!----------------------------------------------------------------------
619      ilocalcomm = mpi_comm_oce
620      IF( PRESENT(kcom) )   ilocalcomm = kcom
621
622      isz = SIZE(y_in)
623     
624      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
625
626      idvar = -1
627      DO ji = 1, nbdelay
628         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
629      END DO
630      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
631
632      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
633         !                                       --------------------------
634         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
635            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
636            DEALLOCATE(todelay(idvar)%z1d)
637            ndelayid(idvar) = -1                                      ! do as if we had no restart
638         ELSE
639            ALLOCATE(todelay(idvar)%y1d(isz))
640            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
641         END IF
642      ENDIF
643     
644      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
645         !                                       --------------------------
646         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
647         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
648         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
649      ENDIF
650
651      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
652
653      ! send back pout from todelay(idvar)%z1d defined at previous call
654      pout(:) = todelay(idvar)%z1d(:)
655
656      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
657#if defined key_mpi2
658      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
659      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
660      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
661#else
662      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
663#endif
664
665   END SUBROUTINE mpp_delay_sum
666
667   
668   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
669      !!----------------------------------------------------------------------
670      !!                   ***  routine mpp_delay_max  ***
671      !!
672      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
673      !!
674      !!----------------------------------------------------------------------
675      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
676      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
677      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
678      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
679      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
680      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
681      !!
682      INTEGER ::   ji, isz
683      INTEGER ::   idvar
684      INTEGER ::   ierr, ilocalcomm
685      !!----------------------------------------------------------------------
686      ilocalcomm = mpi_comm_oce
687      IF( PRESENT(kcom) )   ilocalcomm = kcom
688
689      isz = SIZE(p_in)
690
691      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
692
693      idvar = -1
694      DO ji = 1, nbdelay
695         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
696      END DO
697      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
698
699      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
700         !                                       --------------------------
701         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
702            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
703            DEALLOCATE(todelay(idvar)%z1d)
704            ndelayid(idvar) = -1                                      ! do as if we had no restart
705         END IF
706      ENDIF
707
708      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
709         !                                       --------------------------
710         ALLOCATE(todelay(idvar)%z1d(isz))
711         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
712      ENDIF
713
714      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
715
716      ! send back pout from todelay(idvar)%z1d defined at previous call
717      pout(:) = todelay(idvar)%z1d(:)
718
719      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
720#if defined key_mpi2
721      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
722      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
723      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
724#else
725      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
726#endif
727
728   END SUBROUTINE mpp_delay_max
729
730   
731   SUBROUTINE mpp_delay_rcv( kid )
732      !!----------------------------------------------------------------------
733      !!                   ***  routine mpp_delay_rcv  ***
734      !!
735      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
736      !!
737      !!----------------------------------------------------------------------
738      INTEGER,INTENT(in   )      ::  kid 
739      INTEGER ::   ierr
740      !!----------------------------------------------------------------------
741      IF( ndelayid(kid) /= -2 ) THEN 
742#if ! defined key_mpi2
743         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
744         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
745         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
746#endif
747         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
748         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
749      ENDIF
750   END SUBROUTINE mpp_delay_rcv
751
752   
753   !!----------------------------------------------------------------------
754   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
755   !!   
756   !!----------------------------------------------------------------------
757   !!
758#  define OPERATION_MAX
759#  define INTEGER_TYPE
760#  define DIM_0d
761#     define ROUTINE_ALLREDUCE           mppmax_int
762#     include "mpp_allreduce_generic.h90"
763#     undef ROUTINE_ALLREDUCE
764#  undef DIM_0d
765#  define DIM_1d
766#     define ROUTINE_ALLREDUCE           mppmax_a_int
767#     include "mpp_allreduce_generic.h90"
768#     undef ROUTINE_ALLREDUCE
769#  undef DIM_1d
770#  undef INTEGER_TYPE
771!
772#  define REAL_TYPE
773#  define DIM_0d
774#     define ROUTINE_ALLREDUCE           mppmax_real
775#     include "mpp_allreduce_generic.h90"
776#     undef ROUTINE_ALLREDUCE
777#  undef DIM_0d
778#  define DIM_1d
779#     define ROUTINE_ALLREDUCE           mppmax_a_real
780#     include "mpp_allreduce_generic.h90"
781#     undef ROUTINE_ALLREDUCE
782#  undef DIM_1d
783#  undef REAL_TYPE
784#  undef OPERATION_MAX
785   !!----------------------------------------------------------------------
786   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
787   !!   
788   !!----------------------------------------------------------------------
789   !!
790#  define OPERATION_MIN
791#  define INTEGER_TYPE
792#  define DIM_0d
793#     define ROUTINE_ALLREDUCE           mppmin_int
794#     include "mpp_allreduce_generic.h90"
795#     undef ROUTINE_ALLREDUCE
796#  undef DIM_0d
797#  define DIM_1d
798#     define ROUTINE_ALLREDUCE           mppmin_a_int
799#     include "mpp_allreduce_generic.h90"
800#     undef ROUTINE_ALLREDUCE
801#  undef DIM_1d
802#  undef INTEGER_TYPE
803!
804#  define REAL_TYPE
805#  define DIM_0d
806#     define ROUTINE_ALLREDUCE           mppmin_real
807#     include "mpp_allreduce_generic.h90"
808#     undef ROUTINE_ALLREDUCE
809#  undef DIM_0d
810#  define DIM_1d
811#     define ROUTINE_ALLREDUCE           mppmin_a_real
812#     include "mpp_allreduce_generic.h90"
813#     undef ROUTINE_ALLREDUCE
814#  undef DIM_1d
815#  undef REAL_TYPE
816#  undef OPERATION_MIN
817
818   !!----------------------------------------------------------------------
819   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
820   !!   
821   !!   Global sum of 1D array or a variable (integer, real or complex)
822   !!----------------------------------------------------------------------
823   !!
824#  define OPERATION_SUM
825#  define INTEGER_TYPE
826#  define DIM_0d
827#     define ROUTINE_ALLREDUCE           mppsum_int
828#     include "mpp_allreduce_generic.h90"
829#     undef ROUTINE_ALLREDUCE
830#  undef DIM_0d
831#  define DIM_1d
832#     define ROUTINE_ALLREDUCE           mppsum_a_int
833#     include "mpp_allreduce_generic.h90"
834#     undef ROUTINE_ALLREDUCE
835#  undef DIM_1d
836#  undef INTEGER_TYPE
837!
838#  define REAL_TYPE
839#  define DIM_0d
840#     define ROUTINE_ALLREDUCE           mppsum_real
841#     include "mpp_allreduce_generic.h90"
842#     undef ROUTINE_ALLREDUCE
843#  undef DIM_0d
844#  define DIM_1d
845#     define ROUTINE_ALLREDUCE           mppsum_a_real
846#     include "mpp_allreduce_generic.h90"
847#     undef ROUTINE_ALLREDUCE
848#  undef DIM_1d
849#  undef REAL_TYPE
850#  undef OPERATION_SUM
851
852#  define OPERATION_SUM_DD
853#  define COMPLEX_TYPE
854#  define DIM_0d
855#     define ROUTINE_ALLREDUCE           mppsum_realdd
856#     include "mpp_allreduce_generic.h90"
857#     undef ROUTINE_ALLREDUCE
858#  undef DIM_0d
859#  define DIM_1d
860#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
861#     include "mpp_allreduce_generic.h90"
862#     undef ROUTINE_ALLREDUCE
863#  undef DIM_1d
864#  undef COMPLEX_TYPE
865#  undef OPERATION_SUM_DD
866
867   !!----------------------------------------------------------------------
868   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
869   !!   
870   !!----------------------------------------------------------------------
871   !!
872#  define OPERATION_MINLOC
873#  define DIM_2d
874#     define ROUTINE_LOC           mpp_minloc2d
875#     include "mpp_loc_generic.h90"
876#     undef ROUTINE_LOC
877#  undef DIM_2d
878#  define DIM_3d
879#     define ROUTINE_LOC           mpp_minloc3d
880#     include "mpp_loc_generic.h90"
881#     undef ROUTINE_LOC
882#  undef DIM_3d
883#  undef OPERATION_MINLOC
884
885#  define OPERATION_MAXLOC
886#  define DIM_2d
887#     define ROUTINE_LOC           mpp_maxloc2d
888#     include "mpp_loc_generic.h90"
889#     undef ROUTINE_LOC
890#  undef DIM_2d
891#  define DIM_3d
892#     define ROUTINE_LOC           mpp_maxloc3d
893#     include "mpp_loc_generic.h90"
894#     undef ROUTINE_LOC
895#  undef DIM_3d
896#  undef OPERATION_MAXLOC
897
898   SUBROUTINE mppsync()
899      !!----------------------------------------------------------------------
900      !!                  ***  routine mppsync  ***
901      !!
902      !! ** Purpose :   Massively parallel processors, synchroneous
903      !!
904      !!-----------------------------------------------------------------------
905      INTEGER :: ierror
906      !!-----------------------------------------------------------------------
907      !
908      CALL mpi_barrier( mpi_comm_oce, ierror )
909      !
910   END SUBROUTINE mppsync
911
912
913   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
914      !!----------------------------------------------------------------------
915      !!                  ***  routine mppstop  ***
916      !!
917      !! ** purpose :   Stop massively parallel processors method
918      !!
919      !!----------------------------------------------------------------------
920      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
921      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
922      LOGICAL ::   llfinal, ll_force_abort
923      INTEGER ::   info
924      !!----------------------------------------------------------------------
925      llfinal = .FALSE.
926      IF( PRESENT(ldfinal) ) llfinal = ldfinal
927      ll_force_abort = .FALSE.
928      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
929      !
930      IF(ll_force_abort) THEN
931         CALL mpi_abort( MPI_COMM_WORLD )
932      ELSE
933         CALL mppsync
934         CALL mpi_finalize( info )
935      ENDIF
936      IF( .NOT. llfinal ) STOP 123
937      !
938   END SUBROUTINE mppstop
939
940
941   SUBROUTINE mpp_comm_free( kcom )
942      !!----------------------------------------------------------------------
943      INTEGER, INTENT(in) ::   kcom
944      !!
945      INTEGER :: ierr
946      !!----------------------------------------------------------------------
947      !
948      CALL MPI_COMM_FREE(kcom, ierr)
949      !
950   END SUBROUTINE mpp_comm_free
951
952
953   SUBROUTINE mpp_ini_znl( kumout )
954      !!----------------------------------------------------------------------
955      !!               ***  routine mpp_ini_znl  ***
956      !!
957      !! ** Purpose :   Initialize special communicator for computing zonal sum
958      !!
959      !! ** Method  : - Look for processors in the same row
960      !!              - Put their number in nrank_znl
961      !!              - Create group for the znl processors
962      !!              - Create a communicator for znl processors
963      !!              - Determine if processor should write znl files
964      !!
965      !! ** output
966      !!      ndim_rank_znl = number of processors on the same row
967      !!      ngrp_znl = group ID for the znl processors
968      !!      ncomm_znl = communicator for the ice procs.
969      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
970      !!
971      !!----------------------------------------------------------------------
972      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
973      !
974      INTEGER :: jproc      ! dummy loop integer
975      INTEGER :: ierr, ii   ! local integer
976      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
977      !!----------------------------------------------------------------------
978      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
979      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
980      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
981      !
982      ALLOCATE( kwork(jpnij), STAT=ierr )
983      IF( ierr /= 0 ) THEN
984         WRITE(kumout, cform_err)
985         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
986         CALL mppstop
987      ENDIF
988
989      IF( jpnj == 1 ) THEN
990         ngrp_znl  = ngrp_world
991         ncomm_znl = mpi_comm_oce
992      ELSE
993         !
994         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
995         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
996         !-$$        CALL flush(numout)
997         !
998         ! Count number of processors on the same row
999         ndim_rank_znl = 0
1000         DO jproc=1,jpnij
1001            IF ( kwork(jproc) == njmpp ) THEN
1002               ndim_rank_znl = ndim_rank_znl + 1
1003            ENDIF
1004         END DO
1005         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1006         !-$$        CALL flush(numout)
1007         ! Allocate the right size to nrank_znl
1008         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1009         ALLOCATE(nrank_znl(ndim_rank_znl))
1010         ii = 0
1011         nrank_znl (:) = 0
1012         DO jproc=1,jpnij
1013            IF ( kwork(jproc) == njmpp) THEN
1014               ii = ii + 1
1015               nrank_znl(ii) = jproc -1
1016            ENDIF
1017         END DO
1018         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1019         !-$$        CALL flush(numout)
1020
1021         ! Create the opa group
1022         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1023         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1024         !-$$        CALL flush(numout)
1025
1026         ! Create the znl group from the opa group
1027         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1028         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1029         !-$$        CALL flush(numout)
1030
1031         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1032         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1033         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1034         !-$$        CALL flush(numout)
1035         !
1036      END IF
1037
1038      ! Determines if processor if the first (starting from i=1) on the row
1039      IF ( jpni == 1 ) THEN
1040         l_znl_root = .TRUE.
1041      ELSE
1042         l_znl_root = .FALSE.
1043         kwork (1) = nimpp
1044         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1045         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1046      END IF
1047
1048      DEALLOCATE(kwork)
1049
1050   END SUBROUTINE mpp_ini_znl
1051
1052
1053   SUBROUTINE mpp_ini_north
1054      !!----------------------------------------------------------------------
1055      !!               ***  routine mpp_ini_north  ***
1056      !!
1057      !! ** Purpose :   Initialize special communicator for north folding
1058      !!      condition together with global variables needed in the mpp folding
1059      !!
1060      !! ** Method  : - Look for northern processors
1061      !!              - Put their number in nrank_north
1062      !!              - Create groups for the world processors and the north processors
1063      !!              - Create a communicator for northern processors
1064      !!
1065      !! ** output
1066      !!      njmppmax = njmpp for northern procs
1067      !!      ndim_rank_north = number of processors in the northern line
1068      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1069      !!      ngrp_world = group ID for the world processors
1070      !!      ngrp_north = group ID for the northern processors
1071      !!      ncomm_north = communicator for the northern procs.
1072      !!      north_root = number (in the world) of proc 0 in the northern comm.
1073      !!
1074      !!----------------------------------------------------------------------
1075      INTEGER ::   ierr
1076      INTEGER ::   jjproc
1077      INTEGER ::   ii, ji
1078      !!----------------------------------------------------------------------
1079      !
1080      njmppmax = MAXVAL( njmppt )
1081      !
1082      ! Look for how many procs on the northern boundary
1083      ndim_rank_north = 0
1084      DO jjproc = 1, jpnij
1085         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1086      END DO
1087      !
1088      ! Allocate the right size to nrank_north
1089      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1090      ALLOCATE( nrank_north(ndim_rank_north) )
1091
1092      ! Fill the nrank_north array with proc. number of northern procs.
1093      ! Note : the rank start at 0 in MPI
1094      ii = 0
1095      DO ji = 1, jpnij
1096         IF ( njmppt(ji) == njmppmax   ) THEN
1097            ii=ii+1
1098            nrank_north(ii)=ji-1
1099         END IF
1100      END DO
1101      !
1102      ! create the world group
1103      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1104      !
1105      ! Create the North group from the world group
1106      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1107      !
1108      ! Create the North communicator , ie the pool of procs in the north group
1109      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1110      !
1111   END SUBROUTINE mpp_ini_north
1112
1113
1114   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1115      !!---------------------------------------------------------------------
1116      !!                   ***  routine mpp_init.opa  ***
1117      !!
1118      !! ** Purpose :: export and attach a MPI buffer for bsend
1119      !!
1120      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1121      !!            but classical mpi_init
1122      !!
1123      !! History :: 01/11 :: IDRIS initial version for IBM only
1124      !!            08/04 :: R. Benshila, generalisation
1125      !!---------------------------------------------------------------------
1126      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1127      INTEGER                      , INTENT(inout) ::   ksft
1128      INTEGER                      , INTENT(  out) ::   code
1129      INTEGER                                      ::   ierr, ji
1130      LOGICAL                                      ::   mpi_was_called
1131      !!---------------------------------------------------------------------
1132      !
1133      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1134      IF ( code /= MPI_SUCCESS ) THEN
1135         DO ji = 1, SIZE(ldtxt)
1136            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1137         END DO
1138         WRITE(*, cform_err)
1139         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1140         CALL mpi_abort( mpi_comm_world, code, ierr )
1141      ENDIF
1142      !
1143      IF( .NOT. mpi_was_called ) THEN
1144         CALL mpi_init( code )
1145         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1146         IF ( code /= MPI_SUCCESS ) THEN
1147            DO ji = 1, SIZE(ldtxt)
1148               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1149            END DO
1150            WRITE(*, cform_err)
1151            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1152            CALL mpi_abort( mpi_comm_world, code, ierr )
1153         ENDIF
1154      ENDIF
1155      !
1156      IF( nn_buffer > 0 ) THEN
1157         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1158         ! Buffer allocation and attachment
1159         ALLOCATE( tampon(nn_buffer), stat = ierr )
1160         IF( ierr /= 0 ) THEN
1161            DO ji = 1, SIZE(ldtxt)
1162               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1163            END DO
1164            WRITE(*, cform_err)
1165            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1166            CALL mpi_abort( mpi_comm_world, code, ierr )
1167         END IF
1168         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1169      ENDIF
1170      !
1171   END SUBROUTINE mpi_init_oce
1172
1173
1174   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1175      !!---------------------------------------------------------------------
1176      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1177      !!
1178      !!   Modification of original codes written by David H. Bailey
1179      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1180      !!---------------------------------------------------------------------
1181      INTEGER                     , INTENT(in)    ::   ilen, itype
1182      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1183      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1184      !
1185      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1186      INTEGER  :: ji, ztmp           ! local scalar
1187      !!---------------------------------------------------------------------
1188      !
1189      ztmp = itype   ! avoid compilation warning
1190      !
1191      DO ji=1,ilen
1192      ! Compute ydda + yddb using Knuth's trick.
1193         zt1  = real(ydda(ji)) + real(yddb(ji))
1194         zerr = zt1 - real(ydda(ji))
1195         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1196                + aimag(ydda(ji)) + aimag(yddb(ji))
1197
1198         ! The result is zt1 + zt2, after normalization.
1199         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1200      END DO
1201      !
1202   END SUBROUTINE DDPDD_MPI
1203
1204
1205   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1206      !!---------------------------------------------------------------------
1207      !!                   ***  routine mpp_lbc_north_icb  ***
1208      !!
1209      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1210      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1211      !!              array with outer extra halo
1212      !!
1213      !! ** Method  :   North fold condition and mpp with more than one proc
1214      !!              in i-direction require a specific treatment. We gather
1215      !!              the 4+kextj northern lines of the global domain on 1
1216      !!              processor and apply lbc north-fold on this sub array.
1217      !!              Then we scatter the north fold array back to the processors.
1218      !!              This routine accounts for an extra halo with icebergs
1219      !!              and assumes ghost rows and columns have been suppressed.
1220      !!
1221      !!----------------------------------------------------------------------
1222      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1223      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1224      !                                                     !   = T ,  U , V , F or W -points
1225      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1226      !!                                                    ! north fold, =  1. otherwise
1227      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1228      !
1229      INTEGER ::   ji, jj, jr
1230      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1231      INTEGER ::   ipj, ij, iproc
1232      !
1233      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1234      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1235      !!----------------------------------------------------------------------
1236      !
1237      ipj=4
1238      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1239     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1240     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1241      !
1242      ztab_e(:,:)      = 0._wp
1243      znorthloc_e(:,:) = 0._wp
1244      !
1245      ij = 1 - kextj
1246      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1247      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1248         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1249         ij = ij + 1
1250      END DO
1251      !
1252      itaille = jpimax * ( ipj + 2*kextj )
1253      !
1254      IF( ln_timing ) CALL tic_tac(.TRUE.)
1255      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1256         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1257         &                ncomm_north, ierr )
1258      !
1259      IF( ln_timing ) CALL tic_tac(.FALSE.)
1260      !
1261      DO jr = 1, ndim_rank_north            ! recover the global north array
1262         iproc = nrank_north(jr) + 1
1263         ildi = nldit (iproc)
1264         ilei = nleit (iproc)
1265         iilb = nimppt(iproc)
1266         DO jj = 1-kextj, ipj+kextj
1267            DO ji = ildi, ilei
1268               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1269            END DO
1270         END DO
1271      END DO
1272
1273      ! 2. North-Fold boundary conditions
1274      ! ----------------------------------
1275      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1276
1277      ij = 1 - kextj
1278      !! Scatter back to pt2d
1279      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1280         DO ji= 1, jpi
1281            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1282         END DO
1283         ij  = ij +1
1284      END DO
1285      !
1286      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1287      !
1288   END SUBROUTINE mpp_lbc_north_icb
1289
1290
1291   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1292      !!----------------------------------------------------------------------
1293      !!                  ***  routine mpp_lnk_2d_icb  ***
1294      !!
1295      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1296      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1297      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1298      !!
1299      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1300      !!      between processors following neighboring subdomains.
1301      !!            domain parameters
1302      !!                    jpi    : first dimension of the local subdomain
1303      !!                    jpj    : second dimension of the local subdomain
1304      !!                    kexti  : number of columns for extra outer halo
1305      !!                    kextj  : number of rows for extra outer halo
1306      !!                    nbondi : mark for "east-west local boundary"
1307      !!                    nbondj : mark for "north-south local boundary"
1308      !!                    noea   : number for local neighboring processors
1309      !!                    nowe   : number for local neighboring processors
1310      !!                    noso   : number for local neighboring processors
1311      !!                    nono   : number for local neighboring processors
1312      !!----------------------------------------------------------------------
1313      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1314      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1315      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1316      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1317      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1318      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1319      !
1320      INTEGER  ::   jl   ! dummy loop indices
1321      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1322      INTEGER  ::   ipreci, iprecj             !   -       -
1323      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1324      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1325      !!
1326      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1327      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1328      !!----------------------------------------------------------------------
1329
1330      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1331      iprecj = nn_hls + kextj
1332
1333      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1334
1335      ! 1. standard boundary treatment
1336      ! ------------------------------
1337      ! Order matters Here !!!!
1338      !
1339      !                                      ! East-West boundaries
1340      !                                           !* Cyclic east-west
1341      IF( l_Iperio ) THEN
1342         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1343         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1344         !
1345      ELSE                                        !* closed
1346         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1347                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1348      ENDIF
1349      !                                      ! North-South boundaries
1350      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1351         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1352         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1353      ELSE                                        !* closed
1354         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1355                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1356      ENDIF
1357      !
1358
1359      ! north fold treatment
1360      ! -----------------------
1361      IF( npolj /= 0 ) THEN
1362         !
1363         SELECT CASE ( jpni )
1364                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1365                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1366         END SELECT
1367         !
1368      ENDIF
1369
1370      ! 2. East and west directions exchange
1371      ! ------------------------------------
1372      ! we play with the neigbours AND the row number because of the periodicity
1373      !
1374      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1375      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1376         iihom = jpi-nreci-kexti
1377         DO jl = 1, ipreci
1378            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1379            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1380         END DO
1381      END SELECT
1382      !
1383      !                           ! Migrations
1384      imigr = ipreci * ( jpj + 2*kextj )
1385      !
1386      IF( ln_timing ) CALL tic_tac(.TRUE.)
1387      !
1388      SELECT CASE ( nbondi )
1389      CASE ( -1 )
1390         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1391         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1392         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1393      CASE ( 0 )
1394         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1395         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1396         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1397         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1398         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1399         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1400      CASE ( 1 )
1401         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1402         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1403         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1404      END SELECT
1405      !
1406      IF( ln_timing ) CALL tic_tac(.FALSE.)
1407      !
1408      !                           ! Write Dirichlet lateral conditions
1409      iihom = jpi - nn_hls
1410      !
1411      SELECT CASE ( nbondi )
1412      CASE ( -1 )
1413         DO jl = 1, ipreci
1414            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1415         END DO
1416      CASE ( 0 )
1417         DO jl = 1, ipreci
1418            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1419            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1420         END DO
1421      CASE ( 1 )
1422         DO jl = 1, ipreci
1423            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1424         END DO
1425      END SELECT
1426
1427
1428      ! 3. North and south directions
1429      ! -----------------------------
1430      ! always closed : we play only with the neigbours
1431      !
1432      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1433         ijhom = jpj-nrecj-kextj
1434         DO jl = 1, iprecj
1435            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1436            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1437         END DO
1438      ENDIF
1439      !
1440      !                           ! Migrations
1441      imigr = iprecj * ( jpi + 2*kexti )
1442      !
1443      IF( ln_timing ) CALL tic_tac(.TRUE.)
1444      !
1445      SELECT CASE ( nbondj )
1446      CASE ( -1 )
1447         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1448         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1449         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1450      CASE ( 0 )
1451         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1452         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1453         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1454         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1455         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1456         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1457      CASE ( 1 )
1458         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1459         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1460         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1461      END SELECT
1462      !
1463      IF( ln_timing ) CALL tic_tac(.FALSE.)
1464      !
1465      !                           ! Write Dirichlet lateral conditions
1466      ijhom = jpj - nn_hls
1467      !
1468      SELECT CASE ( nbondj )
1469      CASE ( -1 )
1470         DO jl = 1, iprecj
1471            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1472         END DO
1473      CASE ( 0 )
1474         DO jl = 1, iprecj
1475            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1476            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1477         END DO
1478      CASE ( 1 )
1479         DO jl = 1, iprecj
1480            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1481         END DO
1482      END SELECT
1483      !
1484   END SUBROUTINE mpp_lnk_2d_icb
1485
1486
1487   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1488      !!----------------------------------------------------------------------
1489      !!                  ***  routine mpp_report  ***
1490      !!
1491      !! ** Purpose :   report use of mpp routines per time-setp
1492      !!
1493      !!----------------------------------------------------------------------
1494      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1495      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1496      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1497      !!
1498      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1499      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1500      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1501      !!----------------------------------------------------------------------
1502      !
1503      ll_lbc = .FALSE.
1504      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1505      ll_glb = .FALSE.
1506      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1507      ll_dlg = .FALSE.
1508      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1509      !
1510      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1511      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1512      ncom_freq = ncom_fsbc
1513      !
1514      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1515         IF( ll_lbc ) THEN
1516            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1517            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1518            n_sequence_lbc = n_sequence_lbc + 1
1519            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1520            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1521            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1522            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1523         ENDIF
1524         IF( ll_glb ) THEN
1525            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1526            n_sequence_glb = n_sequence_glb + 1
1527            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1528            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1529         ENDIF
1530         IF( ll_dlg ) THEN
1531            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1532            n_sequence_dlg = n_sequence_dlg + 1
1533            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1534            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1535         ENDIF
1536      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1537         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1538         WRITE(numcom,*) ' '
1539         WRITE(numcom,*) ' ------------------------------------------------------------'
1540         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1541         WRITE(numcom,*) ' ------------------------------------------------------------'
1542         WRITE(numcom,*) ' '
1543         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1544         jj = 0; jk = 0; jf = 0; jh = 0
1545         DO ji = 1, n_sequence_lbc
1546            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1547            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1548            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1549            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1550         END DO
1551         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1552         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1553         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1554         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1555         WRITE(numcom,*) ' '
1556         WRITE(numcom,*) ' lbc_lnk called'
1557         DO ji = 1, n_sequence_lbc - 1
1558            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1559               ccountname = crname_lbc(ji)
1560               crname_lbc(ji) = 'already counted'
1561               jcount = 1
1562               DO jj = ji + 1, n_sequence_lbc
1563                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1564                     jcount = jcount + 1
1565                     crname_lbc(jj) = 'already counted'
1566                  END IF
1567               END DO
1568               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1569            END IF
1570         END DO
1571         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1572            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1573         END IF
1574         WRITE(numcom,*) ' '
1575         IF ( n_sequence_glb > 0 ) THEN
1576            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1577            jj = 1
1578            DO ji = 2, n_sequence_glb
1579               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1580                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1581                  jj = 0
1582               END IF
1583               jj = jj + 1 
1584            END DO
1585            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1586            DEALLOCATE(crname_glb)
1587         ELSE
1588            WRITE(numcom,*) ' No MPI global communication '
1589         ENDIF
1590         WRITE(numcom,*) ' '
1591         IF ( n_sequence_dlg > 0 ) THEN
1592            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1593            jj = 1
1594            DO ji = 2, n_sequence_dlg
1595               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1596                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1597                  jj = 0
1598               END IF
1599               jj = jj + 1 
1600            END DO
1601            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1602            DEALLOCATE(crname_dlg)
1603         ELSE
1604            WRITE(numcom,*) ' No MPI delayed global communication '
1605         ENDIF
1606         WRITE(numcom,*) ' '
1607         WRITE(numcom,*) ' -----------------------------------------------'
1608         WRITE(numcom,*) ' '
1609         DEALLOCATE(ncomm_sequence)
1610         DEALLOCATE(crname_lbc)
1611      ENDIF
1612   END SUBROUTINE mpp_report
1613
1614   
1615   SUBROUTINE tic_tac (ld_tic, ld_global)
1616
1617    LOGICAL,           INTENT(IN) :: ld_tic
1618    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1619    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1620    REAL(wp),               SAVE :: tic_ct = 0._wp
1621    INTEGER :: ii
1622
1623    IF( ncom_stp <= nit000 ) RETURN
1624    IF( ncom_stp == nitend ) RETURN
1625    ii = 1
1626    IF( PRESENT( ld_global ) ) THEN
1627       IF( ld_global ) ii = 2
1628    END IF
1629   
1630    IF ( ld_tic ) THEN
1631       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1632       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1633    ELSE
1634       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1635       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1636    ENDIF
1637   
1638   END SUBROUTINE tic_tac
1639
1640   
1641#else
1642   !!----------------------------------------------------------------------
1643   !!   Default case:            Dummy module        share memory computing
1644   !!----------------------------------------------------------------------
1645   USE in_out_manager
1646
1647   INTERFACE mpp_sum
1648      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1649   END INTERFACE
1650   INTERFACE mpp_max
1651      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1652   END INTERFACE
1653   INTERFACE mpp_min
1654      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1655   END INTERFACE
1656   INTERFACE mpp_minloc
1657      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1658   END INTERFACE
1659   INTERFACE mpp_maxloc
1660      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1661   END INTERFACE
1662
1663   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1664   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1665   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1666
1667   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1668   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1669   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1670   LOGICAL, PUBLIC                          ::   l_full_nf_update = .TRUE.
1671   TYPE ::   DELAYARR
1672      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1673      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1674   END TYPE DELAYARR
1675   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1676   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1677   !!----------------------------------------------------------------------
1678CONTAINS
1679
1680   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1681      INTEGER, INTENT(in) ::   kumout
1682      lib_mpp_alloc = 0
1683   END FUNCTION lib_mpp_alloc
1684
1685   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1686      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1687      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1688      CHARACTER(len=*) ::   ldname
1689      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1690      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1691      function_value = 0
1692      IF( .FALSE. )   ldtxt(:) = 'never done'
1693      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1694   END FUNCTION mynode
1695
1696   SUBROUTINE mppsync                       ! Dummy routine
1697   END SUBROUTINE mppsync
1698
1699   !!----------------------------------------------------------------------
1700   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1701   !!   
1702   !!----------------------------------------------------------------------
1703   !!
1704#  define OPERATION_MAX
1705#  define INTEGER_TYPE
1706#  define DIM_0d
1707#     define ROUTINE_ALLREDUCE           mppmax_int
1708#     include "mpp_allreduce_generic.h90"
1709#     undef ROUTINE_ALLREDUCE
1710#  undef DIM_0d
1711#  define DIM_1d
1712#     define ROUTINE_ALLREDUCE           mppmax_a_int
1713#     include "mpp_allreduce_generic.h90"
1714#     undef ROUTINE_ALLREDUCE
1715#  undef DIM_1d
1716#  undef INTEGER_TYPE
1717!
1718#  define REAL_TYPE
1719#  define DIM_0d
1720#     define ROUTINE_ALLREDUCE           mppmax_real
1721#     include "mpp_allreduce_generic.h90"
1722#     undef ROUTINE_ALLREDUCE
1723#  undef DIM_0d
1724#  define DIM_1d
1725#     define ROUTINE_ALLREDUCE           mppmax_a_real
1726#     include "mpp_allreduce_generic.h90"
1727#     undef ROUTINE_ALLREDUCE
1728#  undef DIM_1d
1729#  undef REAL_TYPE
1730#  undef OPERATION_MAX
1731   !!----------------------------------------------------------------------
1732   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1733   !!   
1734   !!----------------------------------------------------------------------
1735   !!
1736#  define OPERATION_MIN
1737#  define INTEGER_TYPE
1738#  define DIM_0d
1739#     define ROUTINE_ALLREDUCE           mppmin_int
1740#     include "mpp_allreduce_generic.h90"
1741#     undef ROUTINE_ALLREDUCE
1742#  undef DIM_0d
1743#  define DIM_1d
1744#     define ROUTINE_ALLREDUCE           mppmin_a_int
1745#     include "mpp_allreduce_generic.h90"
1746#     undef ROUTINE_ALLREDUCE
1747#  undef DIM_1d
1748#  undef INTEGER_TYPE
1749!
1750#  define REAL_TYPE
1751#  define DIM_0d
1752#     define ROUTINE_ALLREDUCE           mppmin_real
1753#     include "mpp_allreduce_generic.h90"
1754#     undef ROUTINE_ALLREDUCE
1755#  undef DIM_0d
1756#  define DIM_1d
1757#     define ROUTINE_ALLREDUCE           mppmin_a_real
1758#     include "mpp_allreduce_generic.h90"
1759#     undef ROUTINE_ALLREDUCE
1760#  undef DIM_1d
1761#  undef REAL_TYPE
1762#  undef OPERATION_MIN
1763
1764   !!----------------------------------------------------------------------
1765   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1766   !!   
1767   !!   Global sum of 1D array or a variable (integer, real or complex)
1768   !!----------------------------------------------------------------------
1769   !!
1770#  define OPERATION_SUM
1771#  define INTEGER_TYPE
1772#  define DIM_0d
1773#     define ROUTINE_ALLREDUCE           mppsum_int
1774#     include "mpp_allreduce_generic.h90"
1775#     undef ROUTINE_ALLREDUCE
1776#  undef DIM_0d
1777#  define DIM_1d
1778#     define ROUTINE_ALLREDUCE           mppsum_a_int
1779#     include "mpp_allreduce_generic.h90"
1780#     undef ROUTINE_ALLREDUCE
1781#  undef DIM_1d
1782#  undef INTEGER_TYPE
1783!
1784#  define REAL_TYPE
1785#  define DIM_0d
1786#     define ROUTINE_ALLREDUCE           mppsum_real
1787#     include "mpp_allreduce_generic.h90"
1788#     undef ROUTINE_ALLREDUCE
1789#  undef DIM_0d
1790#  define DIM_1d
1791#     define ROUTINE_ALLREDUCE           mppsum_a_real
1792#     include "mpp_allreduce_generic.h90"
1793#     undef ROUTINE_ALLREDUCE
1794#  undef DIM_1d
1795#  undef REAL_TYPE
1796#  undef OPERATION_SUM
1797
1798#  define OPERATION_SUM_DD
1799#  define COMPLEX_TYPE
1800#  define DIM_0d
1801#     define ROUTINE_ALLREDUCE           mppsum_realdd
1802#     include "mpp_allreduce_generic.h90"
1803#     undef ROUTINE_ALLREDUCE
1804#  undef DIM_0d
1805#  define DIM_1d
1806#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1807#     include "mpp_allreduce_generic.h90"
1808#     undef ROUTINE_ALLREDUCE
1809#  undef DIM_1d
1810#  undef COMPLEX_TYPE
1811#  undef OPERATION_SUM_DD
1812
1813   !!----------------------------------------------------------------------
1814   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1815   !!   
1816   !!----------------------------------------------------------------------
1817   !!
1818#  define OPERATION_MINLOC
1819#  define DIM_2d
1820#     define ROUTINE_LOC           mpp_minloc2d
1821#     include "mpp_loc_generic.h90"
1822#     undef ROUTINE_LOC
1823#  undef DIM_2d
1824#  define DIM_3d
1825#     define ROUTINE_LOC           mpp_minloc3d
1826#     include "mpp_loc_generic.h90"
1827#     undef ROUTINE_LOC
1828#  undef DIM_3d
1829#  undef OPERATION_MINLOC
1830
1831#  define OPERATION_MAXLOC
1832#  define DIM_2d
1833#     define ROUTINE_LOC           mpp_maxloc2d
1834#     include "mpp_loc_generic.h90"
1835#     undef ROUTINE_LOC
1836#  undef DIM_2d
1837#  define DIM_3d
1838#     define ROUTINE_LOC           mpp_maxloc3d
1839#     include "mpp_loc_generic.h90"
1840#     undef ROUTINE_LOC
1841#  undef DIM_3d
1842#  undef OPERATION_MAXLOC
1843
1844   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1845      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1846      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1847      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1848      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1849      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1850      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1851      !
1852      pout(:) = REAL(y_in(:), wp)
1853   END SUBROUTINE mpp_delay_sum
1854
1855   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1856      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1857      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1858      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1859      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1860      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1861      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1862      !
1863      pout(:) = p_in(:)
1864   END SUBROUTINE mpp_delay_max
1865
1866   SUBROUTINE mpp_delay_rcv( kid )
1867      INTEGER,INTENT(in   )      ::  kid 
1868      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1869   END SUBROUTINE mpp_delay_rcv
1870   
1871   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1872      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1873      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1874      STOP      ! non MPP case, just stop the run
1875   END SUBROUTINE mppstop
1876
1877   SUBROUTINE mpp_ini_znl( knum )
1878      INTEGER :: knum
1879      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1880   END SUBROUTINE mpp_ini_znl
1881
1882   SUBROUTINE mpp_comm_free( kcom )
1883      INTEGER :: kcom
1884      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1885   END SUBROUTINE mpp_comm_free
1886   
1887#endif
1888
1889   !!----------------------------------------------------------------------
1890   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1891   !!----------------------------------------------------------------------
1892
1893   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1894      &                 cd6, cd7, cd8, cd9, cd10 )
1895      !!----------------------------------------------------------------------
1896      !!                  ***  ROUTINE  stop_opa  ***
1897      !!
1898      !! ** Purpose :   print in ocean.outpput file a error message and
1899      !!                increment the error number (nstop) by one.
1900      !!----------------------------------------------------------------------
1901      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1902      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1903      !!----------------------------------------------------------------------
1904      !
1905      nstop = nstop + 1
1906
1907      ! force to open ocean.output file
1908      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1909       
1910      WRITE(numout,cform_err)
1911      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1912      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1913      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1914      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1915      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1916      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1917      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1918      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1919      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1920      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1921
1922                               CALL FLUSH(numout    )
1923      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1924      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1925      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1926      !
1927      IF( cd1 == 'STOP' ) THEN
1928         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1929         CALL mppstop(ld_force_abort = .true.)
1930      ENDIF
1931      !
1932   END SUBROUTINE ctl_stop
1933
1934
1935   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1936      &                 cd6, cd7, cd8, cd9, cd10 )
1937      !!----------------------------------------------------------------------
1938      !!                  ***  ROUTINE  stop_warn  ***
1939      !!
1940      !! ** Purpose :   print in ocean.outpput file a error message and
1941      !!                increment the warning number (nwarn) by one.
1942      !!----------------------------------------------------------------------
1943      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1944      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1945      !!----------------------------------------------------------------------
1946      !
1947      nwarn = nwarn + 1
1948      IF(lwp) THEN
1949         WRITE(numout,cform_war)
1950         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1951         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1952         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1953         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1954         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1955         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1956         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1957         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1958         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1959         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1960      ENDIF
1961      CALL FLUSH(numout)
1962      !
1963   END SUBROUTINE ctl_warn
1964
1965
1966   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1967      !!----------------------------------------------------------------------
1968      !!                  ***  ROUTINE ctl_opn  ***
1969      !!
1970      !! ** Purpose :   Open file and check if required file is available.
1971      !!
1972      !! ** Method  :   Fortan open
1973      !!----------------------------------------------------------------------
1974      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1975      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1976      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1977      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1978      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1979      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1980      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1981      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1982      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1983      !
1984      CHARACTER(len=80) ::   clfile
1985      INTEGER           ::   iost
1986      !!----------------------------------------------------------------------
1987      !
1988      ! adapt filename
1989      ! ----------------
1990      clfile = TRIM(cdfile)
1991      IF( PRESENT( karea ) ) THEN
1992         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1993      ENDIF
1994#if defined key_agrif
1995      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1996      knum=Agrif_Get_Unit()
1997#else
1998      knum=get_unit()
1999#endif
2000      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
2001      !
2002      iost=0
2003      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
2004         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
2005      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
2006         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
2007      ELSE
2008         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
2009      ENDIF
2010      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
2011         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
2012      IF( iost == 0 ) THEN
2013         IF(ldwp) THEN
2014            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
2015            WRITE(kout,*) '     unit   = ', knum
2016            WRITE(kout,*) '     status = ', cdstat
2017            WRITE(kout,*) '     form   = ', cdform
2018            WRITE(kout,*) '     access = ', cdacce
2019            WRITE(kout,*)
2020         ENDIF
2021      ENDIF
2022100   CONTINUE
2023      IF( iost /= 0 ) THEN
2024         IF(ldwp) THEN
2025            WRITE(kout,*)
2026            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
2027            WRITE(kout,*) ' =======   ===  '
2028            WRITE(kout,*) '           unit   = ', knum
2029            WRITE(kout,*) '           status = ', cdstat
2030            WRITE(kout,*) '           form   = ', cdform
2031            WRITE(kout,*) '           access = ', cdacce
2032            WRITE(kout,*) '           iostat = ', iost
2033            WRITE(kout,*) '           we stop. verify the file '
2034            WRITE(kout,*)
2035         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
2036            WRITE(*,*)
2037            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
2038            WRITE(*,*) ' =======   ===  '
2039            WRITE(*,*) '           unit   = ', knum
2040            WRITE(*,*) '           status = ', cdstat
2041            WRITE(*,*) '           form   = ', cdform
2042            WRITE(*,*) '           access = ', cdacce
2043            WRITE(*,*) '           iostat = ', iost
2044            WRITE(*,*) '           we stop. verify the file '
2045            WRITE(*,*)
2046         ENDIF
2047         CALL FLUSH( kout ) 
2048         STOP 'ctl_opn bad opening'
2049      ENDIF
2050      !
2051   END SUBROUTINE ctl_opn
2052
2053
2054   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
2055      !!----------------------------------------------------------------------
2056      !!                  ***  ROUTINE ctl_nam  ***
2057      !!
2058      !! ** Purpose :   Informations when error while reading a namelist
2059      !!
2060      !! ** Method  :   Fortan open
2061      !!----------------------------------------------------------------------
2062      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
2063      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
2064      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
2065      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
2066      !!----------------------------------------------------------------------
2067      !
2068      WRITE (clios, '(I5.0)')   kios
2069      IF( kios < 0 ) THEN         
2070         CALL ctl_warn( 'end of record or file while reading namelist '   &
2071            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2072      ENDIF
2073      !
2074      IF( kios > 0 ) THEN
2075         CALL ctl_stop( 'misspelled variable in namelist '   &
2076            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2077      ENDIF
2078      kios = 0
2079      RETURN
2080      !
2081   END SUBROUTINE ctl_nam
2082
2083
2084   INTEGER FUNCTION get_unit()
2085      !!----------------------------------------------------------------------
2086      !!                  ***  FUNCTION  get_unit  ***
2087      !!
2088      !! ** Purpose :   return the index of an unused logical unit
2089      !!----------------------------------------------------------------------
2090      LOGICAL :: llopn
2091      !!----------------------------------------------------------------------
2092      !
2093      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2094      llopn = .TRUE.
2095      DO WHILE( (get_unit < 998) .AND. llopn )
2096         get_unit = get_unit + 1
2097         INQUIRE( unit = get_unit, opened = llopn )
2098      END DO
2099      IF( (get_unit == 999) .AND. llopn ) THEN
2100         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2101         get_unit = -1
2102      ENDIF
2103      !
2104   END FUNCTION get_unit
2105
2106   !!----------------------------------------------------------------------
2107END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.