New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in branches/2017/dev_r8126_ROBUST08_no_ghost/NEMOGCM/NEMO/OPA_SRC/LBC – NEMO

source: branches/2017/dev_r8126_ROBUST08_no_ghost/NEMOGCM/NEMO/OPA_SRC/LBC/lib_mpp.F90 @ 8809

Last change on this file since 8809 was 8809, checked in by acc, 6 years ago

Branch 2017/dev_r8126_ROBUST08_no_ghost. Remove multi forms of the mpp_bdy_lnk routines generated by mpp_bdy_generic.h90. They are not used and would not be an effective optimisation because of the loop over different boundaries.

  • Property svn:keywords set to Id
File size: 97.9 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_e     : interface (defined in lbclnk) for message passing of 2d array with extra halo (mpp_lnk_2d_e)
44   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
45   !!   mpprecv       :
46   !!   mppsend       :
47   !!   mppscatter    :
48   !!   mppgather     :
49   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
50   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
51   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
52   !!   mpp_minloc    :
53   !!   mpp_maxloc    :
54   !!   mppsync       :
55   !!   mppstop       :
56   !!   mpp_ini_north : initialisation of north fold
57!!gm   !!   mpp_lbc_north : north fold processors gathering
58   !!   mpp_lbc_north_e : variant of mpp_lbc_north for extra outer halo
59   !!   mpp_lbc_north_icb : variant of mpp_lbc_north for extra outer halo with icebergs
60   !!----------------------------------------------------------------------
61   USE dom_oce        ! ocean space and time domain
62   USE lbcnfd         ! north fold treatment
63   USE in_out_manager ! I/O manager
64   USE wrk_nemo       ! work arrays
65
66   IMPLICIT NONE
67   PRIVATE
68
69   INTERFACE mpp_nfd
70      MODULE PROCEDURE   mpp_nfd_2d      , mpp_nfd_3d      , mpp_nfd_4d
71      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
72   END INTERFACE
73
74   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
75   PUBLIC   mpp_lnk_2d      , mpp_lnk_3d      , mpp_lnk_4d
76   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
77   PUBLIC   mpp_lnk_2d_e
78   !
79!!gm  this should be useless
80   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
81   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
82!!gm end
83   !
84   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
85   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
86   PUBLIC   mpp_ini_north, mpp_lbc_north_e
87!!gm   PUBLIC   mpp_ini_north, mpp_lbc_north, mpp_lbc_north_e
88   PUBLIC   mpp_lbc_north_icb, mpp_lnk_2d_icb
89   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
90   PUBLIC   mpp_max_multiple
91!!gm   PUBLIC   mpp_lnk_2d_9
92!!gm   PUBLIC   mpp_lnk_sum_3d, mpp_lnk_sum_2d
93   PUBLIC   mppscatter, mppgather
94   PUBLIC   mpp_ini_ice, mpp_ini_znl
95   PUBLIC   mppsize
96   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
97   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d
98   PUBLIC   mpprank
99   
100   !! * Interfaces
101   !! define generic interface for these routine as they are called sometimes
102   !! with scalar arguments instead of array arguments, which causes problems
103   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
104   INTERFACE mpp_min
105      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
106   END INTERFACE
107   INTERFACE mpp_max
108      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
109   END INTERFACE
110   INTERFACE mpp_sum
111      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
112         &             mppsum_realdd, mppsum_a_realdd
113   END INTERFACE
114!!gm   INTERFACE mpp_lbc_north
115!!gm      MODULE PROCEDURE mpp_lbc_north_3d, mpp_lbc_north_2d
116!!gm   END INTERFACE
117   INTERFACE mpp_minloc
118      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
119   END INTERFACE
120   INTERFACE mpp_maxloc
121      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
122   END INTERFACE
123   INTERFACE mpp_max_multiple
124      MODULE PROCEDURE mppmax_real_multiple
125   END INTERFACE
126
127   !! ========================= !!
128   !!  MPI  variable definition !!
129   !! ========================= !!
130!$AGRIF_DO_NOT_TREAT
131   INCLUDE 'mpif.h'
132!$AGRIF_END_DO_NOT_TREAT
133
134   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
135
136   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
137
138   INTEGER ::   mppsize        ! number of process
139   INTEGER ::   mpprank        ! process number  [ 0 - size-1 ]
140!$AGRIF_DO_NOT_TREAT
141   INTEGER, PUBLIC ::   mpi_comm_opa   ! opa local communicator
142!$AGRIF_END_DO_NOT_TREAT
143
144   INTEGER :: MPI_SUMDD
145
146   ! variables used in case of sea-ice
147   INTEGER, PUBLIC ::   ncomm_ice       !: communicator made by the processors with sea-ice (public so that it can be freed in limthd)
148   INTEGER         ::   ngrp_iworld     !  group ID for the world processors (for rheology)
149   INTEGER         ::   ngrp_ice        !  group ID for the ice processors (for rheology)
150   INTEGER         ::   ndim_rank_ice   !  number of 'ice' processors
151   INTEGER         ::   n_ice_root      !  number (in the comm_ice) of proc 0 in the ice comm
152   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_ice     ! dimension ndim_rank_ice
153
154   ! variables used for zonal integration
155   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
156   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
157   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
158   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
159   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
160
161   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
162   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
163   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
164   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
165   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
166   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
167   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
168   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
169   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
170
171   ! Type of send : standard, buffered, immediate
172   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
173   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
174   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
175
176   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
177
178   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
179   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
180
181   !!----------------------------------------------------------------------
182   !! NEMO/OPA 4.0 , NEMO Consortium (2017)
183   !! $Id$
184   !! Software governed by the CeCILL licence     (NEMOGCM/NEMO_CeCILL.txt)
185   !!----------------------------------------------------------------------
186CONTAINS
187
188   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
189      !!----------------------------------------------------------------------
190      !!                  ***  routine mynode  ***
191      !!
192      !! ** Purpose :   Find processor unit
193      !!----------------------------------------------------------------------
194      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
195      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
196      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
197      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
198      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
199      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
200      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
201      !
202      INTEGER ::   mynode, ierr, code, ji, ii, ios
203      LOGICAL ::   mpi_was_called
204      !
205      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
206      !!----------------------------------------------------------------------
207      !
208      ii = 1
209      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
210      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
211      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
212      !
213      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
214      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
215901   IF( ios /= 0 ) CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
216      !
217      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
218      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
219902   IF( ios /= 0 ) CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
220      !
221      !                              ! control print
222      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
223      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
224      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
225      !
226#if defined key_agrif
227      IF( .NOT. Agrif_Root() ) THEN
228         jpni  = Agrif_Parent(jpni )
229         jpnj  = Agrif_Parent(jpnj )
230         jpnij = Agrif_Parent(jpnij)
231      ENDIF
232#endif
233      !
234      IF( jpnij < 1 ) THEN         ! If jpnij is not specified in namelist then we calculate it
235         jpnij = jpni * jpnj       ! this means there will be no land cutting out.
236      ENDIF
237
238      IF( jpni < 1 .OR. jpnj < 1  ) THEN
239         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
240      ELSE
241         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
242         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
243         WRITE(ldtxt(ii),*) '      number of local domains           jpnij = ',jpnij      ;   ii = ii + 1
244      ENDIF
245
246      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
247
248      CALL mpi_initialized ( mpi_was_called, code )
249      IF( code /= MPI_SUCCESS ) THEN
250         DO ji = 1, SIZE(ldtxt)
251            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
252         END DO
253         WRITE(*, cform_err)
254         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
255         CALL mpi_abort( mpi_comm_world, code, ierr )
256      ENDIF
257
258      IF( mpi_was_called ) THEN
259         !
260         SELECT CASE ( cn_mpi_send )
261         CASE ( 'S' )                ! Standard mpi send (blocking)
262            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
263         CASE ( 'B' )                ! Buffer mpi send (blocking)
264            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
265            IF( Agrif_Root() )   CALL mpi_init_opa( ldtxt, ii, ierr )
266         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
267            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
268            l_isend = .TRUE.
269         CASE DEFAULT
270            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
271            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
272            kstop = kstop + 1
273         END SELECT
274         !
275      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
276         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
277         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
278         kstop = kstop + 1
279      ELSE
280         SELECT CASE ( cn_mpi_send )
281         CASE ( 'S' )                ! Standard mpi send (blocking)
282            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
283            CALL mpi_init( ierr )
284         CASE ( 'B' )                ! Buffer mpi send (blocking)
285            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
286            IF( Agrif_Root() )   CALL mpi_init_opa( ldtxt, ii, ierr )
287         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
288            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
289            l_isend = .TRUE.
290            CALL mpi_init( ierr )
291         CASE DEFAULT
292            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
293            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
294            kstop = kstop + 1
295         END SELECT
296         !
297      ENDIF
298
299      IF( PRESENT(localComm) ) THEN
300         IF( Agrif_Root() ) THEN
301            mpi_comm_opa = localComm
302         ENDIF
303      ELSE
304         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_opa, code)
305         IF( code /= MPI_SUCCESS ) THEN
306            DO ji = 1, SIZE(ldtxt)
307               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
308            END DO
309            WRITE(*, cform_err)
310            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
311            CALL mpi_abort( mpi_comm_world, code, ierr )
312         ENDIF
313      ENDIF
314
315#if defined key_agrif
316      IF( Agrif_Root() ) THEN
317         CALL Agrif_MPI_Init(mpi_comm_opa)
318      ELSE
319         CALL Agrif_MPI_set_grid_comm(mpi_comm_opa)
320      ENDIF
321#endif
322
323      CALL mpi_comm_rank( mpi_comm_opa, mpprank, ierr )
324      CALL mpi_comm_size( mpi_comm_opa, mppsize, ierr )
325      mynode = mpprank
326
327      IF( mynode == 0 ) THEN
328         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
329         WRITE(kumond, nammpp)     
330      ENDIF
331      !
332      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
333      !
334   END FUNCTION mynode
335
336   !!----------------------------------------------------------------------
337   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
338   !!
339   !!   * Argument : dummy argument use in mpp_lnk_... routines
340   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
341   !!                cd_nat :   nature of array grid-points
342   !!                psgn   :   sign used across the north fold boundary
343   !!                kfld   :   optional, number of pt3d arrays
344   !!                cd_mpp :   optional, fill the overlap area only
345   !!                pval   :   optional, background value (used at closed boundaries)
346   !!----------------------------------------------------------------------
347   !
348   !                       !==  2D array and array of 2D pointer  ==!
349   !
350#  define DIM_2d
351#     define ROUTINE_LNK           mpp_lnk_2d
352#     include "mpp_lnk_generic.h90"
353#     undef ROUTINE_LNK
354#     define MULTI
355#     define ROUTINE_LNK           mpp_lnk_2d_ptr
356#     include "mpp_lnk_generic.h90"
357#     undef ROUTINE_LNK
358#     undef MULTI
359#  undef DIM_2d
360   !
361   !                       !==  3D array and array of 3D pointer  ==!
362   !
363#  define DIM_3d
364#     define ROUTINE_LNK           mpp_lnk_3d
365#     include "mpp_lnk_generic.h90"
366#     undef ROUTINE_LNK
367#     define MULTI
368#     define ROUTINE_LNK           mpp_lnk_3d_ptr
369#     include "mpp_lnk_generic.h90"
370#     undef ROUTINE_LNK
371#     undef MULTI
372#  undef DIM_3d
373   !
374   !                       !==  4D array and array of 4D pointer  ==!
375   !
376#  define DIM_4d
377#     define ROUTINE_LNK           mpp_lnk_4d
378#     include "mpp_lnk_generic.h90"
379#     undef ROUTINE_LNK
380#     define MULTI
381#     define ROUTINE_LNK           mpp_lnk_4d_ptr
382#     include "mpp_lnk_generic.h90"
383#     undef ROUTINE_LNK
384#     undef MULTI
385#  undef DIM_4d
386
387   !!----------------------------------------------------------------------
388   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
389   !!
390   !!   * Argument : dummy argument use in mpp_nfd_... routines
391   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
392   !!                cd_nat :   nature of array grid-points
393   !!                psgn   :   sign used across the north fold boundary
394   !!                kfld   :   optional, number of pt3d arrays
395   !!                cd_mpp :   optional, fill the overlap area only
396   !!                pval   :   optional, background value (used at closed boundaries)
397   !!----------------------------------------------------------------------
398   !
399   !                       !==  2D array and array of 2D pointer  ==!
400   !
401#  define DIM_2d
402#     define ROUTINE_NFD           mpp_nfd_2d
403#     include "mpp_nfd_generic.h90"
404#     undef ROUTINE_NFD
405#     define MULTI
406#     define ROUTINE_NFD           mpp_nfd_2d_ptr
407#     include "mpp_nfd_generic.h90"
408#     undef ROUTINE_NFD
409#     undef MULTI
410#  undef DIM_2d
411   !
412   !                       !==  3D array and array of 3D pointer  ==!
413   !
414#  define DIM_3d
415#     define ROUTINE_NFD           mpp_nfd_3d
416#     include "mpp_nfd_generic.h90"
417#     undef ROUTINE_NFD
418#     define MULTI
419#     define ROUTINE_NFD           mpp_nfd_3d_ptr
420#     include "mpp_nfd_generic.h90"
421#     undef ROUTINE_NFD
422#     undef MULTI
423#  undef DIM_3d
424   !
425   !                       !==  4D array and array of 4D pointer  ==!
426   !
427#  define DIM_4d
428#     define ROUTINE_NFD           mpp_nfd_4d
429#     include "mpp_nfd_generic.h90"
430#     undef ROUTINE_NFD
431#     define MULTI
432#     define ROUTINE_NFD           mpp_nfd_4d_ptr
433#     include "mpp_nfd_generic.h90"
434#     undef ROUTINE_NFD
435#     undef MULTI
436#  undef DIM_4d
437
438
439   !!----------------------------------------------------------------------
440   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
441   !!
442   !!   * Argument : dummy argument use in mpp_lnk_... routines
443   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
444   !!                cd_nat :   nature of array grid-points
445   !!                psgn   :   sign used across the north fold boundary
446   !!                kb_bdy :   BDY boundary set
447   !!                kfld   :   optional, number of pt3d arrays
448   !!----------------------------------------------------------------------
449   !
450   !                       !==  2D array and array of 2D pointer  ==!
451   !
452#  define DIM_2d
453#     define ROUTINE_BDY           mpp_lnk_bdy_2d
454#     include "mpp_bdy_generic.h90"
455#     undef ROUTINE_BDY
456#  undef DIM_2d
457   !
458   !                       !==  3D array and array of 3D pointer  ==!
459   !
460#  define DIM_3d
461#     define ROUTINE_BDY           mpp_lnk_bdy_3d
462#     include "mpp_bdy_generic.h90"
463#     undef ROUTINE_BDY
464#  undef DIM_3d
465   !
466   !                       !==  4D array and array of 4D pointer  ==!
467   !
468!!#  define DIM_4d
469!!#     define ROUTINE_BDY           mpp_lnk_bdy_4d
470!!#     include "mpp_bdy_generic.h90"
471!!#     undef ROUTINE_BDY
472!!#  undef DIM_4d
473
474   !!----------------------------------------------------------------------
475   !!
476   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
477   
478   
479   !!    mpp_lnk_2d_e     utilisé dans ICB
480
481
482   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
483   
484   
485   !!----------------------------------------------------------------------
486
487
488   SUBROUTINE mpp_lnk_2d_e( pt2d, cd_type, psgn, jpri, jprj )
489      !!----------------------------------------------------------------------
490      !!                  ***  routine mpp_lnk_2d_e  ***
491      !!
492      !! ** Purpose :   Message passing manadgement for 2d array (with halo)
493      !!
494      !! ** Method  :   Use mppsend and mpprecv function for passing mask
495      !!      between processors following neighboring subdomains.
496      !!            domain parameters
497      !!                    nlci   : first dimension of the local subdomain
498      !!                    nlcj   : second dimension of the local subdomain
499      !!                    jpri   : number of rows for extra outer halo
500      !!                    jprj   : number of columns for extra outer halo
501      !!                    nbondi : mark for "east-west local boundary"
502      !!                    nbondj : mark for "north-south local boundary"
503      !!                    noea   : number for local neighboring processors
504      !!                    nowe   : number for local neighboring processors
505      !!                    noso   : number for local neighboring processors
506      !!                    nono   : number for local neighboring processors
507      !!
508      !!----------------------------------------------------------------------
509      INTEGER                                             , INTENT(in   ) ::   jpri
510      INTEGER                                             , INTENT(in   ) ::   jprj
511      REAL(wp), DIMENSION(1-jpri:jpi+jpri,1-jprj:jpj+jprj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
512      CHARACTER(len=1)                                    , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
513      !                                                                                 ! = T , U , V , F , W and I points
514      REAL(wp)                                            , INTENT(in   ) ::   psgn     ! =-1 the sign change across the
515      !!                                                                                ! north boundary, =  1. otherwise
516      INTEGER  ::   jl   ! dummy loop indices
517      INTEGER  ::   imigr, iihom, ijhom        ! temporary integers
518      INTEGER  ::   ipreci, iprecj             ! temporary integers
519      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
520      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
521      !!
522      REAL(wp), DIMENSION(1-jpri:jpi+jpri,nn_hls+jprj,2) :: r2dns
523      REAL(wp), DIMENSION(1-jpri:jpi+jpri,nn_hls+jprj,2) :: r2dsn
524      REAL(wp), DIMENSION(1-jprj:jpj+jprj,nn_hls+jpri,2) :: r2dwe
525      REAL(wp), DIMENSION(1-jprj:jpj+jprj,nn_hls+jpri,2) :: r2dew
526      !!----------------------------------------------------------------------
527
528      ipreci = nn_hls + jpri      ! take into account outer extra 2D overlap area
529      iprecj = nn_hls + jprj
530
531
532      ! 1. standard boundary treatment   (CAUTION: the order matters Here !!!! )
533      ! ------------------------------
534      !                                !== North-South boundaries
535      !                                      !* cyclic
536      IF( nbondj == 2 .AND. jperio == 7 ) THEN
537         pt2d(:, 1-jprj:  1     ) = pt2d ( :, jpjm1-jprj:jpjm1 )
538         pt2d(:, jpj   :jpj+jprj) = pt2d ( :, 2         :2+jprj)
539      ELSE                                   !* closed
540         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-jprj   :  nn_hls  ) = 0._wp     ! south except at F-point
541                                      pt2d(:,nlcj-nn_hls+1:jpj+jprj) = 0._wp     ! north
542      ENDIF
543      !                                !== East-West boundaries
544      !                                      !* Cyclic east-west
545      IF( nbondi == 2 .AND. (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) ) THEN
546         pt2d(1-jpri:     1    ,:) = pt2d(jpim1-jpri:  jpim1 ,:)              ! east
547         pt2d(   jpi  :jpi+jpri,:) = pt2d(     2      :2+jpri,:)              ! west
548      ELSE                                   !* closed
549         IF( .NOT. cd_type == 'F' )   pt2d(  1-jpri   :nn_hls    ,:) = 0._wp  ! south except at F-point
550                                      pt2d(nlci-nn_hls+1:jpi+jpri,:) = 0._wp  ! north
551      ENDIF
552      !
553      ! north fold treatment
554      ! --------------------
555      IF( npolj /= 0 ) THEN
556         !
557         SELECT CASE ( jpni )
558!!gm ERROR        CASE ( 1 )     ;   CALL lbc_nfd        ( pt2d(1:jpi,1:jpj+jprj), cd_type, psgn, pr2dj=jprj )
559                  CASE DEFAULT   ;   CALL mpp_lbc_north_e( pt2d                  , cd_type, psgn             )
560         END SELECT
561         !
562      ENDIF
563
564      ! 2. East and west directions exchange
565      ! ------------------------------------
566      ! we play with the neigbours AND the row number because of the periodicity
567      !
568      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
569      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
570         iihom = nlci-nreci-jpri
571         DO jl = 1, ipreci
572            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
573            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
574         END DO
575      END SELECT
576      !
577      !                           ! Migrations
578      imigr = ipreci * ( jpj + 2*jprj)
579      !
580      SELECT CASE ( nbondi )
581      CASE ( -1 )
582         CALL mppsend( 2, r2dwe(1-jprj,1,1), imigr, noea, ml_req1 )
583         CALL mpprecv( 1, r2dew(1-jprj,1,2), imigr, noea )
584         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
585      CASE ( 0 )
586         CALL mppsend( 1, r2dew(1-jprj,1,1), imigr, nowe, ml_req1 )
587         CALL mppsend( 2, r2dwe(1-jprj,1,1), imigr, noea, ml_req2 )
588         CALL mpprecv( 1, r2dew(1-jprj,1,2), imigr, noea )
589         CALL mpprecv( 2, r2dwe(1-jprj,1,2), imigr, nowe )
590         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
591         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
592      CASE ( 1 )
593         CALL mppsend( 1, r2dew(1-jprj,1,1), imigr, nowe, ml_req1 )
594         CALL mpprecv( 2, r2dwe(1-jprj,1,2), imigr, nowe )
595         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
596      END SELECT
597      !
598      !                           ! Write Dirichlet lateral conditions
599      iihom = nlci - nn_hls
600      !
601      SELECT CASE ( nbondi )
602      CASE ( -1 )
603         DO jl = 1, ipreci
604            pt2d(iihom+jl,:) = r2dew(:,jl,2)
605         END DO
606      CASE ( 0 )
607         DO jl = 1, ipreci
608            pt2d(jl-jpri,:) = r2dwe(:,jl,2)
609            pt2d( iihom+jl,:) = r2dew(:,jl,2)
610         END DO
611      CASE ( 1 )
612         DO jl = 1, ipreci
613            pt2d(jl-jpri,:) = r2dwe(:,jl,2)
614         END DO
615      END SELECT
616
617      ! 3. North and south directions
618      ! -----------------------------
619      ! always closed : we play only with the neigbours
620      !
621      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
622         ijhom = nlcj-nrecj-jprj
623         DO jl = 1, iprecj
624            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
625            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
626         END DO
627      ENDIF
628      !
629      !                           ! Migrations
630      imigr = iprecj * ( jpi + 2*jpri )
631      !
632      SELECT CASE ( nbondj )
633      CASE ( -1 )
634         CALL mppsend( 4, r2dsn(1-jpri,1,1), imigr, nono, ml_req1 )
635         CALL mpprecv( 3, r2dns(1-jpri,1,2), imigr, nono )
636         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
637      CASE ( 0 )
638         CALL mppsend( 3, r2dns(1-jpri,1,1), imigr, noso, ml_req1 )
639         CALL mppsend( 4, r2dsn(1-jpri,1,1), imigr, nono, ml_req2 )
640         CALL mpprecv( 3, r2dns(1-jpri,1,2), imigr, nono )
641         CALL mpprecv( 4, r2dsn(1-jpri,1,2), imigr, noso )
642         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
643         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
644      CASE ( 1 )
645         CALL mppsend( 3, r2dns(1-jpri,1,1), imigr, noso, ml_req1 )
646         CALL mpprecv( 4, r2dsn(1-jpri,1,2), imigr, noso )
647         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
648      END SELECT
649      !
650      !                           ! Write Dirichlet lateral conditions
651      ijhom = nlcj - nn_hls
652      !
653      SELECT CASE ( nbondj )
654      CASE ( -1 )
655         DO jl = 1, iprecj
656            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
657         END DO
658      CASE ( 0 )
659         DO jl = 1, iprecj
660            pt2d(:,jl-jprj) = r2dsn(:,jl,2)
661            pt2d(:,ijhom+jl ) = r2dns(:,jl,2)
662         END DO
663      CASE ( 1 )
664         DO jl = 1, iprecj
665            pt2d(:,jl-jprj) = r2dsn(:,jl,2)
666         END DO
667      END SELECT
668      !
669   END SUBROUTINE mpp_lnk_2d_e
670
671
672   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
673      !!----------------------------------------------------------------------
674      !!                  ***  routine mppsend  ***
675      !!
676      !! ** Purpose :   Send messag passing array
677      !!
678      !!----------------------------------------------------------------------
679      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
680      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
681      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
682      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
683      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
684      !!
685      INTEGER ::   iflag
686      !!----------------------------------------------------------------------
687      !
688      SELECT CASE ( cn_mpi_send )
689      CASE ( 'S' )                ! Standard mpi send (blocking)
690         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_opa        , iflag )
691      CASE ( 'B' )                ! Buffer mpi send (blocking)
692         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_opa        , iflag )
693      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
694         ! be carefull, one more argument here : the mpi request identifier..
695         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_opa, md_req, iflag )
696      END SELECT
697      !
698   END SUBROUTINE mppsend
699
700
701   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
702      !!----------------------------------------------------------------------
703      !!                  ***  routine mpprecv  ***
704      !!
705      !! ** Purpose :   Receive messag passing array
706      !!
707      !!----------------------------------------------------------------------
708      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
709      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
710      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
711      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
712      !!
713      INTEGER :: istatus(mpi_status_size)
714      INTEGER :: iflag
715      INTEGER :: use_source
716      !!----------------------------------------------------------------------
717      !
718      ! If a specific process number has been passed to the receive call,
719      ! use that one. Default is to use mpi_any_source
720      use_source = mpi_any_source
721      IF( PRESENT(ksource) )   use_source = ksource
722      !
723      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_opa, istatus, iflag )
724      !
725   END SUBROUTINE mpprecv
726
727
728   SUBROUTINE mppgather( ptab, kp, pio )
729      !!----------------------------------------------------------------------
730      !!                   ***  routine mppgather  ***
731      !!
732      !! ** Purpose :   Transfert between a local subdomain array and a work
733      !!     array which is distributed following the vertical level.
734      !!
735      !!----------------------------------------------------------------------
736      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
737      INTEGER                           , INTENT(in   ) ::   kp     ! record length
738      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
739      !!
740      INTEGER :: itaille, ierror   ! temporary integer
741      !!---------------------------------------------------------------------
742      !
743      itaille = jpi * jpj
744      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
745         &                            mpi_double_precision, kp , mpi_comm_opa, ierror )
746      !
747   END SUBROUTINE mppgather
748
749
750   SUBROUTINE mppscatter( pio, kp, ptab )
751      !!----------------------------------------------------------------------
752      !!                  ***  routine mppscatter  ***
753      !!
754      !! ** Purpose :   Transfert between awork array which is distributed
755      !!      following the vertical level and the local subdomain array.
756      !!
757      !!----------------------------------------------------------------------
758      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
759      INTEGER                             ::   kp     ! Tag (not used with MPI
760      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
761      !!
762      INTEGER :: itaille, ierror   ! temporary integer
763      !!---------------------------------------------------------------------
764      !
765      itaille = jpi * jpj
766      !
767      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
768         &                            mpi_double_precision, kp  , mpi_comm_opa, ierror )
769      !
770   END SUBROUTINE mppscatter
771
772   !!----------------------------------------------------------------------
773   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
774   !!   
775   !!----------------------------------------------------------------------
776   !!
777   SUBROUTINE mppmax_a_int( ktab, kdim, kcom )
778      !!----------------------------------------------------------------------
779      INTEGER , INTENT(in   )                  ::   kdim   ! size of array
780      INTEGER , INTENT(inout), DIMENSION(kdim) ::   ktab   ! input array
781      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom   !
782      INTEGER :: ierror, ilocalcomm   ! temporary integer
783      INTEGER, DIMENSION(kdim) ::   iwork
784      !!----------------------------------------------------------------------
785      ilocalcomm = mpi_comm_opa
786      IF( PRESENT(kcom) )   ilocalcomm = kcom
787      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_max, ilocalcomm, ierror )
788      ktab(:) = iwork(:)
789   END SUBROUTINE mppmax_a_int
790   !!
791   SUBROUTINE mppmax_int( ktab, kcom )
792      !!----------------------------------------------------------------------
793      INTEGER, INTENT(inout)           ::   ktab   ! ???
794      INTEGER, INTENT(in   ), OPTIONAL ::   kcom   ! ???
795      INTEGER ::   ierror, iwork, ilocalcomm   ! temporary integer
796      !!----------------------------------------------------------------------
797      ilocalcomm = mpi_comm_opa
798      IF( PRESENT(kcom) )   ilocalcomm = kcom
799      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_max, ilocalcomm, ierror )
800      ktab = iwork
801   END SUBROUTINE mppmax_int
802   !!
803   SUBROUTINE mppmax_a_real( ptab, kdim, kcom )
804      !!----------------------------------------------------------------------
805      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   ptab
806      INTEGER                  , INTENT(in   ) ::   kdim
807      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom
808      INTEGER :: ierror, ilocalcomm
809      REAL(wp), DIMENSION(kdim) ::  zwork
810      !!----------------------------------------------------------------------
811      ilocalcomm = mpi_comm_opa
812      IF( PRESENT(kcom) )   ilocalcomm = kcom
813      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_max, ilocalcomm, ierror )
814      ptab(:) = zwork(:)
815   END SUBROUTINE mppmax_a_real
816   !!
817   SUBROUTINE mppmax_real( ptab, kcom )
818      !!----------------------------------------------------------------------
819      REAL(wp), INTENT(inout)           ::   ptab   ! ???
820      INTEGER , INTENT(in   ), OPTIONAL ::   kcom   ! ???
821      INTEGER  ::   ierror, ilocalcomm
822      REAL(wp) ::   zwork
823      !!----------------------------------------------------------------------
824      ilocalcomm = mpi_comm_opa
825      IF( PRESENT(kcom) )   ilocalcomm = kcom!
826      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_max, ilocalcomm, ierror )
827      ptab = zwork
828   END SUBROUTINE mppmax_real
829
830
831   !!----------------------------------------------------------------------
832   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
833   !!   
834   !!----------------------------------------------------------------------
835   !!
836   SUBROUTINE mppmin_a_int( ktab, kdim, kcom )
837      !!----------------------------------------------------------------------
838      INTEGER , INTENT( in  )                  ::   kdim   ! size of array
839      INTEGER , INTENT(inout), DIMENSION(kdim) ::   ktab   ! input array
840      INTEGER , INTENT( in  ), OPTIONAL        ::   kcom   ! input array
841      !!
842      INTEGER ::   ierror, ilocalcomm   ! temporary integer
843      INTEGER, DIMENSION(kdim) ::   iwork
844      !!----------------------------------------------------------------------
845      ilocalcomm = mpi_comm_opa
846      IF( PRESENT(kcom) )   ilocalcomm = kcom
847      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_min, ilocalcomm, ierror )
848      ktab(:) = iwork(:)
849   END SUBROUTINE mppmin_a_int
850   !!
851   SUBROUTINE mppmin_int( ktab, kcom )
852      !!----------------------------------------------------------------------
853      INTEGER, INTENT(inout) ::   ktab      ! ???
854      INTEGER , INTENT( in  ), OPTIONAL        ::   kcom        ! input array
855      !!
856      INTEGER ::  ierror, iwork, ilocalcomm
857      !!----------------------------------------------------------------------
858      ilocalcomm = mpi_comm_opa
859      IF( PRESENT(kcom) )   ilocalcomm = kcom
860      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_min, ilocalcomm, ierror )
861      ktab = iwork
862   END SUBROUTINE mppmin_int
863   !!
864   SUBROUTINE mppmin_a_real( ptab, kdim, kcom )
865      !!----------------------------------------------------------------------
866      INTEGER , INTENT(in   )                  ::   kdim
867      REAL(wp), INTENT(inout), DIMENSION(kdim) ::   ptab
868      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom
869      INTEGER :: ierror, ilocalcomm
870      REAL(wp), DIMENSION(kdim) ::   zwork
871      !!-----------------------------------------------------------------------
872      ilocalcomm = mpi_comm_opa
873      IF( PRESENT(kcom) )   ilocalcomm = kcom
874      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_min, ilocalcomm, ierror )
875      ptab(:) = zwork(:)
876   END SUBROUTINE mppmin_a_real
877   !!
878   SUBROUTINE mppmin_real( ptab, kcom )
879      !!-----------------------------------------------------------------------
880      REAL(wp), INTENT(inout)           ::   ptab        !
881      INTEGER , INTENT(in   ), OPTIONAL :: kcom
882      INTEGER  ::   ierror, ilocalcomm
883      REAL(wp) ::   zwork
884      !!-----------------------------------------------------------------------
885      ilocalcomm = mpi_comm_opa
886      IF( PRESENT(kcom) )   ilocalcomm = kcom
887      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_min, ilocalcomm, ierror )
888      ptab = zwork
889   END SUBROUTINE mppmin_real
890
891
892   !!----------------------------------------------------------------------
893   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
894   !!   
895   !!   Global sum of 1D array or a variable (integer, real or complex)
896   !!----------------------------------------------------------------------
897   !!
898   SUBROUTINE mppsum_a_int( ktab, kdim )
899      !!----------------------------------------------------------------------
900      INTEGER, INTENT(in   )                   ::   kdim   ! ???
901      INTEGER, INTENT(inout), DIMENSION (kdim) ::   ktab   ! ???
902      INTEGER :: ierror
903      INTEGER, DIMENSION (kdim) ::  iwork
904      !!----------------------------------------------------------------------
905      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_sum, mpi_comm_opa, ierror )
906      ktab(:) = iwork(:)
907   END SUBROUTINE mppsum_a_int
908   !!
909   SUBROUTINE mppsum_int( ktab )
910      !!----------------------------------------------------------------------
911      INTEGER, INTENT(inout) ::   ktab
912      INTEGER :: ierror, iwork
913      !!----------------------------------------------------------------------
914      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_sum, mpi_comm_opa, ierror )
915      ktab = iwork
916   END SUBROUTINE mppsum_int
917   !!
918   SUBROUTINE mppsum_a_real( ptab, kdim, kcom )
919      !!-----------------------------------------------------------------------
920      INTEGER                  , INTENT(in   ) ::   kdim   ! size of ptab
921      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   ptab   ! input array
922      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom   ! specific communicator
923      INTEGER  ::   ierror, ilocalcomm    ! local integer
924      REAL(wp) ::   zwork(kdim)           ! local workspace
925      !!-----------------------------------------------------------------------
926      ilocalcomm = mpi_comm_opa
927      IF( PRESENT(kcom) )   ilocalcomm = kcom
928      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_sum, ilocalcomm, ierror )
929      ptab(:) = zwork(:)
930   END SUBROUTINE mppsum_a_real
931   !!
932   SUBROUTINE mppsum_real( ptab, kcom )
933      !!-----------------------------------------------------------------------
934      REAL(wp)          , INTENT(inout)           ::   ptab   ! input scalar
935      INTEGER , OPTIONAL, INTENT(in   ) ::   kcom
936      INTEGER  ::   ierror, ilocalcomm
937      REAL(wp) ::   zwork
938      !!-----------------------------------------------------------------------
939      ilocalcomm = mpi_comm_opa
940      IF( PRESENT(kcom) )   ilocalcomm = kcom
941      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_sum, ilocalcomm, ierror )
942      ptab = zwork
943   END SUBROUTINE mppsum_real
944   !!
945   SUBROUTINE mppsum_realdd( ytab, kcom )
946      !!-----------------------------------------------------------------------
947      COMPLEX(wp)          , INTENT(inout) ::   ytab    ! input scalar
948      INTEGER    , OPTIONAL, INTENT(in   ) ::   kcom
949      INTEGER     ::   ierror, ilocalcomm
950      COMPLEX(wp) ::   zwork
951      !!-----------------------------------------------------------------------
952      ilocalcomm = mpi_comm_opa
953      IF( PRESENT(kcom) )   ilocalcomm = kcom
954      CALL MPI_ALLREDUCE( ytab, zwork, 1, MPI_DOUBLE_COMPLEX, MPI_SUMDD, ilocalcomm, ierror )
955      ytab = zwork
956   END SUBROUTINE mppsum_realdd
957   !!
958   SUBROUTINE mppsum_a_realdd( ytab, kdim, kcom )
959      !!----------------------------------------------------------------------
960      INTEGER                     , INTENT(in   ) ::   kdim   ! size of ytab
961      COMPLEX(wp), DIMENSION(kdim), INTENT(inout) ::   ytab   ! input array
962      INTEGER    , OPTIONAL       , INTENT(in   ) ::   kcom
963      INTEGER:: ierror, ilocalcomm    ! local integer
964      COMPLEX(wp), DIMENSION(kdim) :: zwork     ! temporary workspace
965      !!-----------------------------------------------------------------------
966      ilocalcomm = mpi_comm_opa
967      IF( PRESENT(kcom) )   ilocalcomm = kcom
968      CALL MPI_ALLREDUCE( ytab, zwork, kdim, MPI_DOUBLE_COMPLEX, MPI_SUMDD, ilocalcomm, ierror )
969      ytab(:) = zwork(:)
970   END SUBROUTINE mppsum_a_realdd
971   
972
973   SUBROUTINE mppmax_real_multiple( pt1d, kdim, kcom  )
974      !!----------------------------------------------------------------------
975      !!                  ***  routine mppmax_real  ***
976      !!
977      !! ** Purpose :   Maximum across processor of each element of a 1D arrays
978      !!
979      !!----------------------------------------------------------------------
980      REAL(wp), DIMENSION(kdim), INTENT(inout) ::   pt1d   ! 1D arrays
981      INTEGER                  , INTENT(in   ) ::   kdim
982      INTEGER , OPTIONAL       , INTENT(in   ) ::   kcom   ! local communicator
983      !!
984      INTEGER  ::   ierror, ilocalcomm
985      REAL(wp), DIMENSION(kdim) ::  zwork
986      !!----------------------------------------------------------------------
987      ilocalcomm = mpi_comm_opa
988      IF( PRESENT(kcom) )   ilocalcomm = kcom
989      !
990      CALL mpi_allreduce( pt1d, zwork, kdim, mpi_double_precision, mpi_max, ilocalcomm, ierror )
991      pt1d(:) = zwork(:)
992      !
993   END SUBROUTINE mppmax_real_multiple
994
995
996   SUBROUTINE mpp_minloc2d( ptab, pmask, pmin, ki,kj )
997      !!------------------------------------------------------------------------
998      !!             ***  routine mpp_minloc  ***
999      !!
1000      !! ** Purpose :   Compute the global minimum of an array ptab
1001      !!              and also give its global position
1002      !!
1003      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
1004      !!
1005      !!--------------------------------------------------------------------------
1006      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   ptab    ! Local 2D array
1007      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   pmask   ! Local mask
1008      REAL(wp)                     , INTENT(  out) ::   pmin    ! Global minimum of ptab
1009      INTEGER                      , INTENT(  out) ::   ki, kj  ! index of minimum in global frame
1010      !
1011      INTEGER :: ierror
1012      INTEGER , DIMENSION(2)   ::   ilocs
1013      REAL(wp) ::   zmin   ! local minimum
1014      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1015      !!-----------------------------------------------------------------------
1016      !
1017      zmin  = MINVAL( ptab(:,:) , mask= pmask == 1._wp )
1018      ilocs = MINLOC( ptab(:,:) , mask= pmask == 1._wp )
1019      !
1020      ki = ilocs(1) + nimpp - 1
1021      kj = ilocs(2) + njmpp - 1
1022      !
1023      zain(1,:)=zmin
1024      zain(2,:)=ki+10000.*kj
1025      !
1026      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MINLOC,MPI_COMM_OPA,ierror)
1027      !
1028      pmin = zaout(1,1)
1029      kj = INT(zaout(2,1)/10000.)
1030      ki = INT(zaout(2,1) - 10000.*kj )
1031      !
1032   END SUBROUTINE mpp_minloc2d
1033
1034
1035   SUBROUTINE mpp_minloc3d( ptab, pmask, pmin, ki, kj ,kk)
1036      !!------------------------------------------------------------------------
1037      !!             ***  routine mpp_minloc  ***
1038      !!
1039      !! ** Purpose :   Compute the global minimum of an array ptab
1040      !!              and also give its global position
1041      !!
1042      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
1043      !!
1044      !!--------------------------------------------------------------------------
1045      REAL(wp), DIMENSION(:,:,:), INTENT(in   ) ::   ptab         ! Local 2D array
1046      REAL(wp), DIMENSION(:,:,:), INTENT(in   ) ::   pmask        ! Local mask
1047      REAL(wp)                  , INTENT(  out) ::   pmin         ! Global minimum of ptab
1048      INTEGER                   , INTENT(  out) ::   ki, kj, kk   ! index of minimum in global frame
1049      !
1050      INTEGER  ::   ierror
1051      REAL(wp) ::   zmin     ! local minimum
1052      INTEGER , DIMENSION(3)   ::   ilocs
1053      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1054      !!-----------------------------------------------------------------------
1055      !
1056      zmin  = MINVAL( ptab(:,:,:) , mask= pmask == 1._wp )
1057      ilocs = MINLOC( ptab(:,:,:) , mask= pmask == 1._wp )
1058      !
1059      ki = ilocs(1) + nimpp - 1
1060      kj = ilocs(2) + njmpp - 1
1061      kk = ilocs(3)
1062      !
1063      zain(1,:) = zmin
1064      zain(2,:) = ki + 10000.*kj + 100000000.*kk
1065      !
1066      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MINLOC,MPI_COMM_OPA,ierror)
1067      !
1068      pmin = zaout(1,1)
1069      kk   = INT( zaout(2,1) / 100000000. )
1070      kj   = INT( zaout(2,1) - kk * 100000000. ) / 10000
1071      ki   = INT( zaout(2,1) - kk * 100000000. -kj * 10000. )
1072      !
1073   END SUBROUTINE mpp_minloc3d
1074
1075
1076   SUBROUTINE mpp_maxloc2d( ptab, pmask, pmax, ki, kj )
1077      !!------------------------------------------------------------------------
1078      !!             ***  routine mpp_maxloc  ***
1079      !!
1080      !! ** Purpose :   Compute the global maximum of an array ptab
1081      !!              and also give its global position
1082      !!
1083      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
1084      !!
1085      !!--------------------------------------------------------------------------
1086      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   ptab     ! Local 2D array
1087      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   pmask    ! Local mask
1088      REAL(wp)                     , INTENT(  out) ::   pmax     ! Global maximum of ptab
1089      INTEGER                      , INTENT(  out) ::   ki, kj   ! index of maximum in global frame
1090      !!
1091      INTEGER  :: ierror
1092      INTEGER, DIMENSION (2)   ::   ilocs
1093      REAL(wp) :: zmax   ! local maximum
1094      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1095      !!-----------------------------------------------------------------------
1096      !
1097      zmax  = MAXVAL( ptab(:,:) , mask= pmask == 1._wp )
1098      ilocs = MAXLOC( ptab(:,:) , mask= pmask == 1._wp )
1099      !
1100      ki = ilocs(1) + nimpp - 1
1101      kj = ilocs(2) + njmpp - 1
1102      !
1103      zain(1,:) = zmax
1104      zain(2,:) = ki + 10000. * kj
1105      !
1106      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MAXLOC,MPI_COMM_OPA,ierror)
1107      !
1108      pmax = zaout(1,1)
1109      kj   = INT( zaout(2,1) / 10000.     )
1110      ki   = INT( zaout(2,1) - 10000.* kj )
1111      !
1112   END SUBROUTINE mpp_maxloc2d
1113
1114
1115   SUBROUTINE mpp_maxloc3d( ptab, pmask, pmax, ki, kj, kk )
1116      !!------------------------------------------------------------------------
1117      !!             ***  routine mpp_maxloc  ***
1118      !!
1119      !! ** Purpose :  Compute the global maximum of an array ptab
1120      !!              and also give its global position
1121      !!
1122      !! ** Method : Use MPI_ALLREDUCE with MPI_MINLOC
1123      !!
1124      !!--------------------------------------------------------------------------
1125      REAL(wp), DIMENSION (:,:,:), INTENT(in   ) ::   ptab         ! Local 2D array
1126      REAL(wp), DIMENSION (:,:,:), INTENT(in   ) ::   pmask        ! Local mask
1127      REAL(wp)                   , INTENT(  out) ::   pmax         ! Global maximum of ptab
1128      INTEGER                    , INTENT(  out) ::   ki, kj, kk   ! index of maximum in global frame
1129      !
1130      INTEGER  ::   ierror   ! local integer
1131      REAL(wp) ::   zmax     ! local maximum
1132      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1133      INTEGER , DIMENSION(3)   ::   ilocs
1134      !!-----------------------------------------------------------------------
1135      !
1136      zmax  = MAXVAL( ptab(:,:,:) , mask= pmask == 1._wp )
1137      ilocs = MAXLOC( ptab(:,:,:) , mask= pmask == 1._wp )
1138      !
1139      ki = ilocs(1) + nimpp - 1
1140      kj = ilocs(2) + njmpp - 1
1141      kk = ilocs(3)
1142      !
1143      zain(1,:) = zmax
1144      zain(2,:) = ki + 10000.*kj + 100000000.*kk
1145      !
1146      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MAXLOC,MPI_COMM_OPA,ierror)
1147      !
1148      pmax = zaout(1,1)
1149      kk   = INT( zaout(2,1) / 100000000. )
1150      kj   = INT( zaout(2,1) - kk * 100000000. ) / 10000
1151      ki   = INT( zaout(2,1) - kk * 100000000. -kj * 10000. )
1152      !
1153   END SUBROUTINE mpp_maxloc3d
1154
1155
1156   SUBROUTINE mppsync()
1157      !!----------------------------------------------------------------------
1158      !!                  ***  routine mppsync  ***
1159      !!
1160      !! ** Purpose :   Massively parallel processors, synchroneous
1161      !!
1162      !!-----------------------------------------------------------------------
1163      INTEGER :: ierror
1164      !!-----------------------------------------------------------------------
1165      !
1166      CALL mpi_barrier( mpi_comm_opa, ierror )
1167      !
1168   END SUBROUTINE mppsync
1169
1170
1171   SUBROUTINE mppstop
1172      !!----------------------------------------------------------------------
1173      !!                  ***  routine mppstop  ***
1174      !!
1175      !! ** purpose :   Stop massively parallel processors method
1176      !!
1177      !!----------------------------------------------------------------------
1178      INTEGER ::   info
1179      !!----------------------------------------------------------------------
1180      !
1181      CALL mppsync
1182      CALL mpi_finalize( info )
1183      !
1184   END SUBROUTINE mppstop
1185
1186
1187   SUBROUTINE mpp_comm_free( kcom )
1188      !!----------------------------------------------------------------------
1189      INTEGER, INTENT(in) ::   kcom
1190      !!
1191      INTEGER :: ierr
1192      !!----------------------------------------------------------------------
1193      !
1194      CALL MPI_COMM_FREE(kcom, ierr)
1195      !
1196   END SUBROUTINE mpp_comm_free
1197
1198
1199   SUBROUTINE mpp_ini_ice( pindic, kumout )
1200      !!----------------------------------------------------------------------
1201      !!               ***  routine mpp_ini_ice  ***
1202      !!
1203      !! ** Purpose :   Initialize special communicator for ice areas
1204      !!      condition together with global variables needed in the ddmpp folding
1205      !!
1206      !! ** Method  : - Look for ice processors in ice routines
1207      !!              - Put their number in nrank_ice
1208      !!              - Create groups for the world processors and the ice processors
1209      !!              - Create a communicator for ice processors
1210      !!
1211      !! ** output
1212      !!      njmppmax = njmpp for northern procs
1213      !!      ndim_rank_ice = number of processors with ice
1214      !!      nrank_ice (ndim_rank_ice) = ice processors
1215      !!      ngrp_iworld = group ID for the world processors
1216      !!      ngrp_ice = group ID for the ice processors
1217      !!      ncomm_ice = communicator for the ice procs.
1218      !!      n_ice_root = number (in the world) of proc 0 in the ice comm.
1219      !!
1220      !!----------------------------------------------------------------------
1221      INTEGER, INTENT(in) ::   pindic
1222      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical unit
1223      !!
1224      INTEGER :: jjproc
1225      INTEGER :: ii, ierr
1226      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kice
1227      INTEGER, ALLOCATABLE, DIMENSION(:) ::   zwork
1228      !!----------------------------------------------------------------------
1229      !
1230      ! Since this is just an init routine and these arrays are of length jpnij
1231      ! then don't use wrk_nemo module - just allocate and deallocate.
1232      ALLOCATE( kice(jpnij), zwork(jpnij), STAT=ierr )
1233      IF( ierr /= 0 ) THEN
1234         WRITE(kumout, cform_err)
1235         WRITE(kumout,*) 'mpp_ini_ice : failed to allocate 2, 1D arrays (jpnij in length)'
1236         CALL mppstop
1237      ENDIF
1238
1239      ! Look for how many procs with sea-ice
1240      !
1241      kice = 0
1242      DO jjproc = 1, jpnij
1243         IF( jjproc == narea .AND. pindic .GT. 0 )   kice(jjproc) = 1
1244      END DO
1245      !
1246      zwork = 0
1247      CALL MPI_ALLREDUCE( kice, zwork, jpnij, mpi_integer, mpi_sum, mpi_comm_opa, ierr )
1248      ndim_rank_ice = SUM( zwork )
1249
1250      ! Allocate the right size to nrank_north
1251      IF( ALLOCATED ( nrank_ice ) )   DEALLOCATE( nrank_ice )
1252      ALLOCATE( nrank_ice(ndim_rank_ice) )
1253      !
1254      ii = 0
1255      nrank_ice = 0
1256      DO jjproc = 1, jpnij
1257         IF( zwork(jjproc) == 1) THEN
1258            ii = ii + 1
1259            nrank_ice(ii) = jjproc -1
1260         ENDIF
1261      END DO
1262
1263      ! Create the world group
1264      CALL MPI_COMM_GROUP( mpi_comm_opa, ngrp_iworld, ierr )
1265
1266      ! Create the ice group from the world group
1267      CALL MPI_GROUP_INCL( ngrp_iworld, ndim_rank_ice, nrank_ice, ngrp_ice, ierr )
1268
1269      ! Create the ice communicator , ie the pool of procs with sea-ice
1270      CALL MPI_COMM_CREATE( mpi_comm_opa, ngrp_ice, ncomm_ice, ierr )
1271
1272      ! Find proc number in the world of proc 0 in the north
1273      ! The following line seems to be useless, we just comment & keep it as reminder
1274      ! CALL MPI_GROUP_TRANSLATE_RANKS(ngrp_ice,1,0,ngrp_iworld,n_ice_root,ierr)
1275      !
1276      CALL MPI_GROUP_FREE(ngrp_ice, ierr)
1277      CALL MPI_GROUP_FREE(ngrp_iworld, ierr)
1278
1279      DEALLOCATE(kice, zwork)
1280      !
1281   END SUBROUTINE mpp_ini_ice
1282
1283
1284   SUBROUTINE mpp_ini_znl( kumout )
1285      !!----------------------------------------------------------------------
1286      !!               ***  routine mpp_ini_znl  ***
1287      !!
1288      !! ** Purpose :   Initialize special communicator for computing zonal sum
1289      !!
1290      !! ** Method  : - Look for processors in the same row
1291      !!              - Put their number in nrank_znl
1292      !!              - Create group for the znl processors
1293      !!              - Create a communicator for znl processors
1294      !!              - Determine if processor should write znl files
1295      !!
1296      !! ** output
1297      !!      ndim_rank_znl = number of processors on the same row
1298      !!      ngrp_znl = group ID for the znl processors
1299      !!      ncomm_znl = communicator for the ice procs.
1300      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
1301      !!
1302      !!----------------------------------------------------------------------
1303      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
1304      !
1305      INTEGER :: jproc      ! dummy loop integer
1306      INTEGER :: ierr, ii   ! local integer
1307      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1308      !!----------------------------------------------------------------------
1309      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
1310      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
1311      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_opa   : ', mpi_comm_opa
1312      !
1313      ALLOCATE( kwork(jpnij), STAT=ierr )
1314      IF( ierr /= 0 ) THEN
1315         WRITE(kumout, cform_err)
1316         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
1317         CALL mppstop
1318      ENDIF
1319
1320      IF( jpnj == 1 ) THEN
1321         ngrp_znl  = ngrp_world
1322         ncomm_znl = mpi_comm_opa
1323      ELSE
1324         !
1325         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_opa, ierr )
1326         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1327         !-$$        CALL flush(numout)
1328         !
1329         ! Count number of processors on the same row
1330         ndim_rank_znl = 0
1331         DO jproc=1,jpnij
1332            IF ( kwork(jproc) == njmpp ) THEN
1333               ndim_rank_znl = ndim_rank_znl + 1
1334            ENDIF
1335         END DO
1336         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1337         !-$$        CALL flush(numout)
1338         ! Allocate the right size to nrank_znl
1339         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1340         ALLOCATE(nrank_znl(ndim_rank_znl))
1341         ii = 0
1342         nrank_znl (:) = 0
1343         DO jproc=1,jpnij
1344            IF ( kwork(jproc) == njmpp) THEN
1345               ii = ii + 1
1346               nrank_znl(ii) = jproc -1
1347            ENDIF
1348         END DO
1349         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1350         !-$$        CALL flush(numout)
1351
1352         ! Create the opa group
1353         CALL MPI_COMM_GROUP(mpi_comm_opa,ngrp_opa,ierr)
1354         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1355         !-$$        CALL flush(numout)
1356
1357         ! Create the znl group from the opa group
1358         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1359         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1360         !-$$        CALL flush(numout)
1361
1362         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1363         CALL MPI_COMM_CREATE ( mpi_comm_opa, ngrp_znl, ncomm_znl, ierr )
1364         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1365         !-$$        CALL flush(numout)
1366         !
1367      END IF
1368
1369      ! Determines if processor if the first (starting from i=1) on the row
1370      IF ( jpni == 1 ) THEN
1371         l_znl_root = .TRUE.
1372      ELSE
1373         l_znl_root = .FALSE.
1374         kwork (1) = nimpp
1375         CALL mpp_min ( kwork(1), kcom = ncomm_znl)
1376         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1377      END IF
1378
1379      DEALLOCATE(kwork)
1380
1381   END SUBROUTINE mpp_ini_znl
1382
1383
1384   SUBROUTINE mpp_ini_north
1385      !!----------------------------------------------------------------------
1386      !!               ***  routine mpp_ini_north  ***
1387      !!
1388      !! ** Purpose :   Initialize special communicator for north folding
1389      !!      condition together with global variables needed in the mpp folding
1390      !!
1391      !! ** Method  : - Look for northern processors
1392      !!              - Put their number in nrank_north
1393      !!              - Create groups for the world processors and the north processors
1394      !!              - Create a communicator for northern processors
1395      !!
1396      !! ** output
1397      !!      njmppmax = njmpp for northern procs
1398      !!      ndim_rank_north = number of processors in the northern line
1399      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1400      !!      ngrp_world = group ID for the world processors
1401      !!      ngrp_north = group ID for the northern processors
1402      !!      ncomm_north = communicator for the northern procs.
1403      !!      north_root = number (in the world) of proc 0 in the northern comm.
1404      !!
1405      !!----------------------------------------------------------------------
1406      INTEGER ::   ierr
1407      INTEGER ::   jjproc
1408      INTEGER ::   ii, ji
1409      !!----------------------------------------------------------------------
1410      !
1411      njmppmax = MAXVAL( njmppt )
1412      !
1413      ! Look for how many procs on the northern boundary
1414      ndim_rank_north = 0
1415      DO jjproc = 1, jpnij
1416         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1417      END DO
1418      !
1419      ! Allocate the right size to nrank_north
1420      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1421      ALLOCATE( nrank_north(ndim_rank_north) )
1422
1423      ! Fill the nrank_north array with proc. number of northern procs.
1424      ! Note : the rank start at 0 in MPI
1425      ii = 0
1426      DO ji = 1, jpnij
1427         IF ( njmppt(ji) == njmppmax   ) THEN
1428            ii=ii+1
1429            nrank_north(ii)=ji-1
1430         END IF
1431      END DO
1432      !
1433      ! create the world group
1434      CALL MPI_COMM_GROUP( mpi_comm_opa, ngrp_world, ierr )
1435      !
1436      ! Create the North group from the world group
1437      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1438      !
1439      ! Create the North communicator , ie the pool of procs in the north group
1440      CALL MPI_COMM_CREATE( mpi_comm_opa, ngrp_north, ncomm_north, ierr )
1441      !
1442   END SUBROUTINE mpp_ini_north
1443
1444
1445   SUBROUTINE mpp_lbc_north_e( pt2d, cd_type, psgn)
1446      !!---------------------------------------------------------------------
1447      !!                   ***  routine mpp_lbc_north_2d  ***
1448      !!
1449      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1450      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1451      !!              array with outer extra halo
1452      !!
1453      !! ** Method  :   North fold condition and mpp with more than one proc
1454      !!              in i-direction require a specific treatment. We gather
1455      !!              the 4+2*jpr2dj northern lines of the global domain on 1
1456      !!              processor and apply lbc north-fold on this sub array.
1457      !!              Then we scatter the north fold array back to the processors.
1458      !!
1459      !!----------------------------------------------------------------------
1460      REAL(wp), DIMENSION(1-jpr2di:jpi+jpr2di,1-jpr2dj:jpj+jpr2dj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1461      CHARACTER(len=1)                                            , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1462      REAL(wp)                                                    , INTENT(in   ) ::   psgn     ! sign used across the north fold
1463      !
1464      INTEGER ::   ji, jj, jr
1465      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1466      INTEGER ::   ijpj, ij, iproc
1467      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1468      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1469      !!----------------------------------------------------------------------
1470      !
1471      ALLOCATE( ztab_e(jpiglo,4+2*jpr2dj), znorthloc_e(jpi,4+2*jpr2dj), znorthgloio_e(jpi,4+2*jpr2dj,jpni) )
1472      !
1473      ijpj=4
1474      ztab_e(:,:) = 0._wp
1475
1476      ij = 0
1477      ! put in znorthloc_e the last 4 jlines of pt2d
1478      DO jj = nlcj - ijpj + 1 - jpr2dj, nlcj +jpr2dj
1479         ij = ij + 1
1480         DO ji = 1, jpi
1481            znorthloc_e(ji,ij) = pt2d(ji,jj)
1482         END DO
1483      END DO
1484      !
1485      itaille = jpi * ( ijpj + 2 * jpr2dj )
1486      CALL MPI_ALLGATHER( znorthloc_e(1,1)    , itaille, MPI_DOUBLE_PRECISION,    &
1487         &                znorthgloio_e(1,1,1), itaille, MPI_DOUBLE_PRECISION, ncomm_north, ierr )
1488      !
1489      DO jr = 1, ndim_rank_north            ! recover the global north array
1490         iproc = nrank_north(jr) + 1
1491         ildi  = nldit (iproc)
1492         ilei  = nleit (iproc)
1493         iilb  = nimppt(iproc)
1494         DO jj = 1, ijpj+2*jpr2dj
1495            DO ji = ildi, ilei
1496               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1497            END DO
1498         END DO
1499      END DO
1500
1501      ! 2. North-Fold boundary conditions
1502      ! ----------------------------------
1503!!gm ERROR      CALL lbc_nfd( ztab_e(:,:), cd_type, psgn, pr2dj = jpr2dj )
1504
1505      ij = jpr2dj
1506      !! Scatter back to pt2d
1507      DO jj = nlcj - ijpj + 1 , nlcj +jpr2dj
1508      ij  = ij +1
1509         DO ji= 1, nlci
1510            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1511         END DO
1512      END DO
1513      !
1514      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1515      !
1516   END SUBROUTINE mpp_lbc_north_e
1517
1518
1519   SUBROUTINE mpi_init_opa( ldtxt, ksft, code )
1520      !!---------------------------------------------------------------------
1521      !!                   ***  routine mpp_init.opa  ***
1522      !!
1523      !! ** Purpose :: export and attach a MPI buffer for bsend
1524      !!
1525      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1526      !!            but classical mpi_init
1527      !!
1528      !! History :: 01/11 :: IDRIS initial version for IBM only
1529      !!            08/04 :: R. Benshila, generalisation
1530      !!---------------------------------------------------------------------
1531      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1532      INTEGER                      , INTENT(inout) ::   ksft
1533      INTEGER                      , INTENT(  out) ::   code
1534      INTEGER                                      ::   ierr, ji
1535      LOGICAL                                      ::   mpi_was_called
1536      !!---------------------------------------------------------------------
1537      !
1538      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1539      IF ( code /= MPI_SUCCESS ) THEN
1540         DO ji = 1, SIZE(ldtxt)
1541            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1542         END DO
1543         WRITE(*, cform_err)
1544         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1545         CALL mpi_abort( mpi_comm_world, code, ierr )
1546      ENDIF
1547      !
1548      IF( .NOT. mpi_was_called ) THEN
1549         CALL mpi_init( code )
1550         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_opa, code )
1551         IF ( code /= MPI_SUCCESS ) THEN
1552            DO ji = 1, SIZE(ldtxt)
1553               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1554            END DO
1555            WRITE(*, cform_err)
1556            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1557            CALL mpi_abort( mpi_comm_world, code, ierr )
1558         ENDIF
1559      ENDIF
1560      !
1561      IF( nn_buffer > 0 ) THEN
1562         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1563         ! Buffer allocation and attachment
1564         ALLOCATE( tampon(nn_buffer), stat = ierr )
1565         IF( ierr /= 0 ) THEN
1566            DO ji = 1, SIZE(ldtxt)
1567               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1568            END DO
1569            WRITE(*, cform_err)
1570            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1571            CALL mpi_abort( mpi_comm_world, code, ierr )
1572         END IF
1573         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1574      ENDIF
1575      !
1576   END SUBROUTINE mpi_init_opa
1577
1578
1579   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1580      !!---------------------------------------------------------------------
1581      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1582      !!
1583      !!   Modification of original codes written by David H. Bailey
1584      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1585      !!---------------------------------------------------------------------
1586      INTEGER                     , INTENT(in)    ::   ilen, itype
1587      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1588      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1589      !
1590      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1591      INTEGER  :: ji, ztmp           ! local scalar
1592      !!---------------------------------------------------------------------
1593      !
1594      ztmp = itype   ! avoid compilation warning
1595      !
1596      DO ji=1,ilen
1597      ! Compute ydda + yddb using Knuth's trick.
1598         zt1  = real(ydda(ji)) + real(yddb(ji))
1599         zerr = zt1 - real(ydda(ji))
1600         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1601                + aimag(ydda(ji)) + aimag(yddb(ji))
1602
1603         ! The result is zt1 + zt2, after normalization.
1604         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1605      END DO
1606      !
1607   END SUBROUTINE DDPDD_MPI
1608
1609
1610   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, pr2dj)
1611      !!---------------------------------------------------------------------
1612      !!                   ***  routine mpp_lbc_north_icb  ***
1613      !!
1614      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1615      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1616      !!              array with outer extra halo
1617      !!
1618      !! ** Method  :   North fold condition and mpp with more than one proc
1619      !!              in i-direction require a specific treatment. We gather
1620      !!              the 4+2*jpr2dj northern lines of the global domain on 1
1621      !!              processor and apply lbc north-fold on this sub array.
1622      !!              Then we scatter the north fold array back to the processors.
1623      !!              This version accounts for an extra halo with icebergs.
1624      !!
1625      !!----------------------------------------------------------------------
1626      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1627      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1628      !                                                     !   = T ,  U , V , F or W -points
1629      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1630      !!                                                    ! north fold, =  1. otherwise
1631      INTEGER, OPTIONAL       , INTENT(in   ) ::   pr2dj
1632      !
1633      INTEGER ::   ji, jj, jr
1634      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1635      INTEGER ::   ijpj, ij, iproc, ipr2dj
1636      !
1637      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1638      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1639      !!----------------------------------------------------------------------
1640      !
1641      ijpj=4
1642      IF( PRESENT(pr2dj) ) THEN           ! use of additional halos
1643         ipr2dj = pr2dj
1644      ELSE
1645         ipr2dj = 0
1646      ENDIF
1647      ALLOCATE( ztab_e(jpiglo,4+2*ipr2dj), znorthloc_e(jpi,4+2*ipr2dj), znorthgloio_e(jpi,4+2*ipr2dj,jpni) )
1648      !
1649      ztab_e(:,:) = 0._wp
1650      !
1651      ij = 0
1652      ! put in znorthloc_e the last 4 jlines of pt2d
1653      DO jj = nlcj - ijpj + 1 - ipr2dj, nlcj +ipr2dj
1654         ij = ij + 1
1655         DO ji = 1, jpi
1656            znorthloc_e(ji,ij)=pt2d(ji,jj)
1657         END DO
1658      END DO
1659      !
1660      itaille = jpi * ( ijpj + 2 * ipr2dj )
1661      CALL MPI_ALLGATHER( znorthloc_e(1,1)  , itaille, MPI_DOUBLE_PRECISION,    &
1662         &                znorthgloio_e(1,1,1), itaille, MPI_DOUBLE_PRECISION, ncomm_north, ierr )
1663      !
1664      DO jr = 1, ndim_rank_north            ! recover the global north array
1665         iproc = nrank_north(jr) + 1
1666         ildi = nldit (iproc)
1667         ilei = nleit (iproc)
1668         iilb = nimppt(iproc)
1669         DO jj = 1, ijpj+2*ipr2dj
1670            DO ji = ildi, ilei
1671               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1672            END DO
1673         END DO
1674      END DO
1675
1676      ! 2. North-Fold boundary conditions
1677      ! ----------------------------------
1678!!gm ERROR      CALL lbc_nfd( ztab_e(:,:), cd_type, psgn, pr2dj = ipr2dj )
1679
1680      ij = ipr2dj
1681      !! Scatter back to pt2d
1682      DO jj = nlcj - ijpj + 1 , nlcj +ipr2dj
1683      ij  = ij +1
1684         DO ji= 1, nlci
1685            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1686         END DO
1687      END DO
1688      !
1689      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1690      !
1691   END SUBROUTINE mpp_lbc_north_icb
1692
1693
1694   SUBROUTINE mpp_lnk_2d_icb( pt2d, cd_type, psgn, jpri, jprj )
1695      !!----------------------------------------------------------------------
1696      !!                  ***  routine mpp_lnk_2d_icb  ***
1697      !!
1698      !! ** Purpose :   Message passing manadgement for 2d array (with extra halo and icebergs)
1699      !!
1700      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1701      !!      between processors following neighboring subdomains.
1702      !!            domain parameters
1703      !!                    nlci   : first dimension of the local subdomain
1704      !!                    nlcj   : second dimension of the local subdomain
1705      !!                    jpri   : number of rows for extra outer halo
1706      !!                    jprj   : number of columns for extra outer halo
1707      !!                    nbondi : mark for "east-west local boundary"
1708      !!                    nbondj : mark for "north-south local boundary"
1709      !!                    noea   : number for local neighboring processors
1710      !!                    nowe   : number for local neighboring processors
1711      !!                    noso   : number for local neighboring processors
1712      !!                    nono   : number for local neighboring processors
1713      !!----------------------------------------------------------------------
1714      REAL(wp), DIMENSION(1-jpri:jpi+jpri,1-jprj:jpj+jprj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1715      CHARACTER(len=1)                                    , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1716      REAL(wp)                                            , INTENT(in   ) ::   psgn     ! sign used across the north fold
1717      INTEGER                                             , INTENT(in   ) ::   jpri
1718      INTEGER                                             , INTENT(in   ) ::   jprj
1719      !
1720      INTEGER  ::   jl   ! dummy loop indices
1721      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1722      INTEGER  ::   ipreci, iprecj             !   -       -
1723      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1724      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1725      !!
1726      REAL(wp), DIMENSION(1-jpri:jpi+jpri,nn_hls+jprj,2) ::   r2dns, r2dsn
1727      REAL(wp), DIMENSION(1-jprj:jpj+jprj,nn_hls+jpri,2) ::   r2dwe, r2dew
1728      !!----------------------------------------------------------------------
1729
1730      ipreci = nn_hls + jpri      ! take into account outer extra 2D overlap area
1731      iprecj = nn_hls + jprj
1732
1733
1734      ! 1. standard boundary treatment
1735      ! ------------------------------
1736      ! Order matters Here !!!!
1737      !
1738      !                                      ! East-West boundaries
1739      !                                           !* Cyclic east-west
1740      IF( nbondi == 2 .AND. (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) ) THEN
1741         pt2d(1-jpri:     1    ,:) = pt2d(jpim1-jpri:  jpim1 ,:)       ! east
1742         pt2d(   jpi  :jpi+jpri,:) = pt2d(     2      :2+jpri,:)       ! west
1743         !
1744      ELSE                                        !* closed
1745         IF( .NOT. cd_type == 'F' )   pt2d(  1-jpri   :nn_hls    ,:) = 0._wp    ! south except at F-point
1746                                      pt2d(nlci-nn_hls+1:jpi+jpri,:) = 0._wp    ! north
1747      ENDIF
1748      !
1749
1750      ! north fold treatment
1751      ! -----------------------
1752      IF( npolj /= 0 ) THEN
1753         !
1754         SELECT CASE ( jpni )
1755!!gm ERROR         CASE ( 1 )     ;   CALL lbc_nfd        ( pt2d(1:jpi,1:jpj+jprj), cd_type, psgn, pr2dj=jprj )
1756                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+jprj)  , cd_type, psgn , pr2dj=jprj  )
1757         END SELECT
1758         !
1759      ENDIF
1760
1761      ! 2. East and west directions exchange
1762      ! ------------------------------------
1763      ! we play with the neigbours AND the row number because of the periodicity
1764      !
1765      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1766      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1767         iihom = nlci-nreci-jpri
1768         DO jl = 1, ipreci
1769            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1770            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1771         END DO
1772      END SELECT
1773      !
1774      !                           ! Migrations
1775      imigr = ipreci * ( jpj + 2*jprj)
1776      !
1777      SELECT CASE ( nbondi )
1778      CASE ( -1 )
1779         CALL mppsend( 2, r2dwe(1-jprj,1,1), imigr, noea, ml_req1 )
1780         CALL mpprecv( 1, r2dew(1-jprj,1,2), imigr, noea )
1781         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1782      CASE ( 0 )
1783         CALL mppsend( 1, r2dew(1-jprj,1,1), imigr, nowe, ml_req1 )
1784         CALL mppsend( 2, r2dwe(1-jprj,1,1), imigr, noea, ml_req2 )
1785         CALL mpprecv( 1, r2dew(1-jprj,1,2), imigr, noea )
1786         CALL mpprecv( 2, r2dwe(1-jprj,1,2), imigr, nowe )
1787         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1788         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1789      CASE ( 1 )
1790         CALL mppsend( 1, r2dew(1-jprj,1,1), imigr, nowe, ml_req1 )
1791         CALL mpprecv( 2, r2dwe(1-jprj,1,2), imigr, nowe )
1792         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1793      END SELECT
1794      !
1795      !                           ! Write Dirichlet lateral conditions
1796      iihom = nlci - nn_hls
1797      !
1798      SELECT CASE ( nbondi )
1799      CASE ( -1 )
1800         DO jl = 1, ipreci
1801            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1802         END DO
1803      CASE ( 0 )
1804         DO jl = 1, ipreci
1805            pt2d(jl-jpri,:) = r2dwe(:,jl,2)
1806            pt2d( iihom+jl,:) = r2dew(:,jl,2)
1807         END DO
1808      CASE ( 1 )
1809         DO jl = 1, ipreci
1810            pt2d(jl-jpri,:) = r2dwe(:,jl,2)
1811         END DO
1812      END SELECT
1813
1814
1815      ! 3. North and south directions
1816      ! -----------------------------
1817      ! always closed : we play only with the neigbours
1818      !
1819      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1820         ijhom = nlcj-nrecj-jprj
1821         DO jl = 1, iprecj
1822            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1823            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1824         END DO
1825      ENDIF
1826      !
1827      !                           ! Migrations
1828      imigr = iprecj * ( jpi + 2*jpri )
1829      !
1830      SELECT CASE ( nbondj )
1831      CASE ( -1 )
1832         CALL mppsend( 4, r2dsn(1-jpri,1,1), imigr, nono, ml_req1 )
1833         CALL mpprecv( 3, r2dns(1-jpri,1,2), imigr, nono )
1834         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1835      CASE ( 0 )
1836         CALL mppsend( 3, r2dns(1-jpri,1,1), imigr, noso, ml_req1 )
1837         CALL mppsend( 4, r2dsn(1-jpri,1,1), imigr, nono, ml_req2 )
1838         CALL mpprecv( 3, r2dns(1-jpri,1,2), imigr, nono )
1839         CALL mpprecv( 4, r2dsn(1-jpri,1,2), imigr, noso )
1840         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1841         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1842      CASE ( 1 )
1843         CALL mppsend( 3, r2dns(1-jpri,1,1), imigr, noso, ml_req1 )
1844         CALL mpprecv( 4, r2dsn(1-jpri,1,2), imigr, noso )
1845         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1846      END SELECT
1847      !
1848      !                           ! Write Dirichlet lateral conditions
1849      ijhom = nlcj - nn_hls
1850      !
1851      SELECT CASE ( nbondj )
1852      CASE ( -1 )
1853         DO jl = 1, iprecj
1854            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1855         END DO
1856      CASE ( 0 )
1857         DO jl = 1, iprecj
1858            pt2d(:,jl-jprj) = r2dsn(:,jl,2)
1859            pt2d(:,ijhom+jl ) = r2dns(:,jl,2)
1860         END DO
1861      CASE ( 1 )
1862         DO jl = 1, iprecj
1863            pt2d(:,jl-jprj) = r2dsn(:,jl,2)
1864         END DO
1865      END SELECT
1866      !
1867   END SUBROUTINE mpp_lnk_2d_icb
1868   
1869#else
1870   !!----------------------------------------------------------------------
1871   !!   Default case:            Dummy module        share memory computing
1872   !!----------------------------------------------------------------------
1873   USE in_out_manager
1874
1875   INTERFACE mpp_sum
1876      MODULE PROCEDURE mpp_sum_a2s, mpp_sum_as, mpp_sum_ai, mpp_sum_s, mpp_sum_i, mppsum_realdd, mppsum_a_realdd
1877   END INTERFACE
1878   INTERFACE mpp_max
1879      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1880   END INTERFACE
1881   INTERFACE mpp_min
1882      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1883   END INTERFACE
1884   INTERFACE mpp_minloc
1885      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1886   END INTERFACE
1887   INTERFACE mpp_maxloc
1888      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1889   END INTERFACE
1890   INTERFACE mpp_max_multiple
1891      MODULE PROCEDURE mppmax_real_multiple
1892   END INTERFACE
1893
1894   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1895   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1896   INTEGER :: ncomm_ice
1897   INTEGER, PUBLIC            ::   mpi_comm_opa          ! opa local communicator
1898   !!----------------------------------------------------------------------
1899CONTAINS
1900
1901   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1902      INTEGER, INTENT(in) ::   kumout
1903      lib_mpp_alloc = 0
1904   END FUNCTION lib_mpp_alloc
1905
1906   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1907      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1908      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1909      CHARACTER(len=*) ::   ldname
1910      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1911      IF( PRESENT( localComm ) ) mpi_comm_opa = localComm
1912      function_value = 0
1913      IF( .FALSE. )   ldtxt(:) = 'never done'
1914      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1915   END FUNCTION mynode
1916
1917   SUBROUTINE mppsync                       ! Dummy routine
1918   END SUBROUTINE mppsync
1919
1920   SUBROUTINE mpp_sum_as( parr, kdim, kcom )      ! Dummy routine
1921      REAL   , DIMENSION(:) :: parr
1922      INTEGER               :: kdim
1923      INTEGER, OPTIONAL     :: kcom
1924      WRITE(*,*) 'mpp_sum_as: You should not have seen this print! error?', kdim, parr(1), kcom
1925   END SUBROUTINE mpp_sum_as
1926
1927   SUBROUTINE mpp_sum_a2s( parr, kdim, kcom )      ! Dummy routine
1928      REAL   , DIMENSION(:,:) :: parr
1929      INTEGER               :: kdim
1930      INTEGER, OPTIONAL     :: kcom
1931      WRITE(*,*) 'mpp_sum_a2s: You should not have seen this print! error?', kdim, parr(1,1), kcom
1932   END SUBROUTINE mpp_sum_a2s
1933
1934   SUBROUTINE mpp_sum_ai( karr, kdim, kcom )      ! Dummy routine
1935      INTEGER, DIMENSION(:) :: karr
1936      INTEGER               :: kdim
1937      INTEGER, OPTIONAL     :: kcom
1938      WRITE(*,*) 'mpp_sum_ai: You should not have seen this print! error?', kdim, karr(1), kcom
1939   END SUBROUTINE mpp_sum_ai
1940
1941   SUBROUTINE mpp_sum_s( psca, kcom )            ! Dummy routine
1942      REAL                  :: psca
1943      INTEGER, OPTIONAL     :: kcom
1944      WRITE(*,*) 'mpp_sum_s: You should not have seen this print! error?', psca, kcom
1945   END SUBROUTINE mpp_sum_s
1946
1947   SUBROUTINE mpp_sum_i( kint, kcom )            ! Dummy routine
1948      integer               :: kint
1949      INTEGER, OPTIONAL     :: kcom
1950      WRITE(*,*) 'mpp_sum_i: You should not have seen this print! error?', kint, kcom
1951   END SUBROUTINE mpp_sum_i
1952
1953   SUBROUTINE mppsum_realdd( ytab, kcom )
1954      COMPLEX(wp), INTENT(inout)         :: ytab    ! input scalar
1955      INTEGER , INTENT( in  ), OPTIONAL :: kcom
1956      WRITE(*,*) 'mppsum_realdd: You should not have seen this print! error?', ytab
1957   END SUBROUTINE mppsum_realdd
1958
1959   SUBROUTINE mppsum_a_realdd( ytab, kdim, kcom )
1960      INTEGER , INTENT( in )                        ::   kdim      ! size of ytab
1961      COMPLEX(wp), DIMENSION(kdim), INTENT( inout ) ::   ytab      ! input array
1962      INTEGER , INTENT( in  ), OPTIONAL :: kcom
1963      WRITE(*,*) 'mppsum_a_realdd: You should not have seen this print! error?', kdim, ytab(1), kcom
1964   END SUBROUTINE mppsum_a_realdd
1965
1966   SUBROUTINE mppmax_a_real( parr, kdim, kcom )
1967      REAL   , DIMENSION(:) :: parr
1968      INTEGER               :: kdim
1969      INTEGER, OPTIONAL     :: kcom
1970      WRITE(*,*) 'mppmax_a_real: You should not have seen this print! error?', kdim, parr(1), kcom
1971   END SUBROUTINE mppmax_a_real
1972
1973   SUBROUTINE mppmax_real( psca, kcom )
1974      REAL                  :: psca
1975      INTEGER, OPTIONAL     :: kcom
1976      WRITE(*,*) 'mppmax_real: You should not have seen this print! error?', psca, kcom
1977   END SUBROUTINE mppmax_real
1978
1979   SUBROUTINE mppmin_a_real( parr, kdim, kcom )
1980      REAL   , DIMENSION(:) :: parr
1981      INTEGER               :: kdim
1982      INTEGER, OPTIONAL     :: kcom
1983      WRITE(*,*) 'mppmin_a_real: You should not have seen this print! error?', kdim, parr(1), kcom
1984   END SUBROUTINE mppmin_a_real
1985
1986   SUBROUTINE mppmin_real( psca, kcom )
1987      REAL                  :: psca
1988      INTEGER, OPTIONAL     :: kcom
1989      WRITE(*,*) 'mppmin_real: You should not have seen this print! error?', psca, kcom
1990   END SUBROUTINE mppmin_real
1991
1992   SUBROUTINE mppmax_a_int( karr, kdim ,kcom)
1993      INTEGER, DIMENSION(:) :: karr
1994      INTEGER               :: kdim
1995      INTEGER, OPTIONAL     :: kcom
1996      WRITE(*,*) 'mppmax_a_int: You should not have seen this print! error?', kdim, karr(1), kcom
1997   END SUBROUTINE mppmax_a_int
1998
1999   SUBROUTINE mppmax_int( kint, kcom)
2000      INTEGER               :: kint
2001      INTEGER, OPTIONAL     :: kcom
2002      WRITE(*,*) 'mppmax_int: You should not have seen this print! error?', kint, kcom
2003   END SUBROUTINE mppmax_int
2004
2005   SUBROUTINE mppmin_a_int( karr, kdim, kcom )
2006      INTEGER, DIMENSION(:) :: karr
2007      INTEGER               :: kdim
2008      INTEGER, OPTIONAL     :: kcom
2009      WRITE(*,*) 'mppmin_a_int: You should not have seen this print! error?', kdim, karr(1), kcom
2010   END SUBROUTINE mppmin_a_int
2011
2012   SUBROUTINE mppmin_int( kint, kcom )
2013      INTEGER               :: kint
2014      INTEGER, OPTIONAL     :: kcom
2015      WRITE(*,*) 'mppmin_int: You should not have seen this print! error?', kint, kcom
2016   END SUBROUTINE mppmin_int
2017
2018   SUBROUTINE mpp_minloc2d( ptab, pmask, pmin, ki, kj )
2019      REAL                   :: pmin
2020      REAL , DIMENSION (:,:) :: ptab, pmask
2021      INTEGER :: ki, kj
2022      WRITE(*,*) 'mpp_minloc2d: You should not have seen this print! error?', pmin, ki, kj, ptab(1,1), pmask(1,1)
2023   END SUBROUTINE mpp_minloc2d
2024
2025   SUBROUTINE mpp_minloc3d( ptab, pmask, pmin, ki, kj, kk )
2026      REAL                     :: pmin
2027      REAL , DIMENSION (:,:,:) :: ptab, pmask
2028      INTEGER :: ki, kj, kk
2029      WRITE(*,*) 'mpp_minloc3d: You should not have seen this print! error?', pmin, ki, kj, kk, ptab(1,1,1), pmask(1,1,1)
2030   END SUBROUTINE mpp_minloc3d
2031
2032   SUBROUTINE mpp_maxloc2d( ptab, pmask, pmax, ki, kj )
2033      REAL                   :: pmax
2034      REAL , DIMENSION (:,:) :: ptab, pmask
2035      INTEGER :: ki, kj
2036      WRITE(*,*) 'mpp_maxloc2d: You should not have seen this print! error?', pmax, ki, kj, ptab(1,1), pmask(1,1)
2037   END SUBROUTINE mpp_maxloc2d
2038
2039   SUBROUTINE mpp_maxloc3d( ptab, pmask, pmax, ki, kj, kk )
2040      REAL                     :: pmax
2041      REAL , DIMENSION (:,:,:) :: ptab, pmask
2042      INTEGER :: ki, kj, kk
2043      WRITE(*,*) 'mpp_maxloc3d: You should not have seen this print! error?', pmax, ki, kj, kk, ptab(1,1,1), pmask(1,1,1)
2044   END SUBROUTINE mpp_maxloc3d
2045
2046   SUBROUTINE mppstop
2047      STOP      ! non MPP case, just stop the run
2048   END SUBROUTINE mppstop
2049
2050   SUBROUTINE mpp_ini_ice( kcom, knum )
2051      INTEGER :: kcom, knum
2052      WRITE(*,*) 'mpp_ini_ice: You should not have seen this print! error?', kcom, knum
2053   END SUBROUTINE mpp_ini_ice
2054
2055   SUBROUTINE mpp_ini_znl( knum )
2056      INTEGER :: knum
2057      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
2058   END SUBROUTINE mpp_ini_znl
2059
2060   SUBROUTINE mpp_comm_free( kcom )
2061      INTEGER :: kcom
2062      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
2063   END SUBROUTINE mpp_comm_free
2064   
2065   SUBROUTINE mppmax_real_multiple( ptab, kdim , kcom  )
2066      REAL, DIMENSION(:) ::   ptab   !
2067      INTEGER            ::   kdim   !
2068      INTEGER, OPTIONAL  ::   kcom   !
2069      WRITE(*,*) 'mppmax_real_multiple: You should not have seen this print! error?', ptab(1), kdim
2070   END SUBROUTINE mppmax_real_multiple
2071
2072#endif
2073
2074   !!----------------------------------------------------------------------
2075   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
2076   !!----------------------------------------------------------------------
2077
2078   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
2079      &                 cd6, cd7, cd8, cd9, cd10 )
2080      !!----------------------------------------------------------------------
2081      !!                  ***  ROUTINE  stop_opa  ***
2082      !!
2083      !! ** Purpose :   print in ocean.outpput file a error message and
2084      !!                increment the error number (nstop) by one.
2085      !!----------------------------------------------------------------------
2086      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
2087      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
2088      !!----------------------------------------------------------------------
2089      !
2090      nstop = nstop + 1
2091      IF(lwp) THEN
2092         WRITE(numout,cform_err)
2093         IF( PRESENT(cd1 ) )   WRITE(numout,*) cd1
2094         IF( PRESENT(cd2 ) )   WRITE(numout,*) cd2
2095         IF( PRESENT(cd3 ) )   WRITE(numout,*) cd3
2096         IF( PRESENT(cd4 ) )   WRITE(numout,*) cd4
2097         IF( PRESENT(cd5 ) )   WRITE(numout,*) cd5
2098         IF( PRESENT(cd6 ) )   WRITE(numout,*) cd6
2099         IF( PRESENT(cd7 ) )   WRITE(numout,*) cd7
2100         IF( PRESENT(cd8 ) )   WRITE(numout,*) cd8
2101         IF( PRESENT(cd9 ) )   WRITE(numout,*) cd9
2102         IF( PRESENT(cd10) )   WRITE(numout,*) cd10
2103      ENDIF
2104                               CALL FLUSH(numout    )
2105      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
2106      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
2107      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
2108      !
2109      IF( cd1 == 'STOP' ) THEN
2110         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
2111         CALL mppstop()
2112      ENDIF
2113      !
2114   END SUBROUTINE ctl_stop
2115
2116
2117   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
2118      &                 cd6, cd7, cd8, cd9, cd10 )
2119      !!----------------------------------------------------------------------
2120      !!                  ***  ROUTINE  stop_warn  ***
2121      !!
2122      !! ** Purpose :   print in ocean.outpput file a error message and
2123      !!                increment the warning number (nwarn) by one.
2124      !!----------------------------------------------------------------------
2125      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
2126      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
2127      !!----------------------------------------------------------------------
2128      !
2129      nwarn = nwarn + 1
2130      IF(lwp) THEN
2131         WRITE(numout,cform_war)
2132         IF( PRESENT(cd1 ) ) WRITE(numout,*) cd1
2133         IF( PRESENT(cd2 ) ) WRITE(numout,*) cd2
2134         IF( PRESENT(cd3 ) ) WRITE(numout,*) cd3
2135         IF( PRESENT(cd4 ) ) WRITE(numout,*) cd4
2136         IF( PRESENT(cd5 ) ) WRITE(numout,*) cd5
2137         IF( PRESENT(cd6 ) ) WRITE(numout,*) cd6
2138         IF( PRESENT(cd7 ) ) WRITE(numout,*) cd7
2139         IF( PRESENT(cd8 ) ) WRITE(numout,*) cd8
2140         IF( PRESENT(cd9 ) ) WRITE(numout,*) cd9
2141         IF( PRESENT(cd10) ) WRITE(numout,*) cd10
2142      ENDIF
2143      CALL FLUSH(numout)
2144      !
2145   END SUBROUTINE ctl_warn
2146
2147
2148   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
2149      !!----------------------------------------------------------------------
2150      !!                  ***  ROUTINE ctl_opn  ***
2151      !!
2152      !! ** Purpose :   Open file and check if required file is available.
2153      !!
2154      !! ** Method  :   Fortan open
2155      !!----------------------------------------------------------------------
2156      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
2157      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
2158      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
2159      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
2160      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
2161      INTEGER          , INTENT(in   ) ::   klengh    ! record length
2162      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
2163      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
2164      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
2165      !
2166      CHARACTER(len=80) ::   clfile
2167      INTEGER           ::   iost
2168      !!----------------------------------------------------------------------
2169      !
2170      ! adapt filename
2171      ! ----------------
2172      clfile = TRIM(cdfile)
2173      IF( PRESENT( karea ) ) THEN
2174         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
2175      ENDIF
2176#if defined key_agrif
2177      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
2178      knum=Agrif_Get_Unit()
2179#else
2180      knum=get_unit()
2181#endif
2182      !
2183      iost=0
2184      IF( cdacce(1:6) == 'DIRECT' )  THEN
2185         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh, ERR=100, IOSTAT=iost )
2186      ELSE
2187         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat             , ERR=100, IOSTAT=iost )
2188      ENDIF
2189      IF( iost == 0 ) THEN
2190         IF(ldwp) THEN
2191            WRITE(kout,*) '     file   : ', clfile,' open ok'
2192            WRITE(kout,*) '     unit   = ', knum
2193            WRITE(kout,*) '     status = ', cdstat
2194            WRITE(kout,*) '     form   = ', cdform
2195            WRITE(kout,*) '     access = ', cdacce
2196            WRITE(kout,*)
2197         ENDIF
2198      ENDIF
2199100   CONTINUE
2200      IF( iost /= 0 ) THEN
2201         IF(ldwp) THEN
2202            WRITE(kout,*)
2203            WRITE(kout,*) ' ===>>>> : bad opening file: ', clfile
2204            WRITE(kout,*) ' =======   ===  '
2205            WRITE(kout,*) '           unit   = ', knum
2206            WRITE(kout,*) '           status = ', cdstat
2207            WRITE(kout,*) '           form   = ', cdform
2208            WRITE(kout,*) '           access = ', cdacce
2209            WRITE(kout,*) '           iostat = ', iost
2210            WRITE(kout,*) '           we stop. verify the file '
2211            WRITE(kout,*)
2212         ENDIF
2213         CALL FLUSH( kout ) 
2214         STOP 'ctl_opn bad opening'
2215      ENDIF
2216      !
2217   END SUBROUTINE ctl_opn
2218
2219
2220   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
2221      !!----------------------------------------------------------------------
2222      !!                  ***  ROUTINE ctl_nam  ***
2223      !!
2224      !! ** Purpose :   Informations when error while reading a namelist
2225      !!
2226      !! ** Method  :   Fortan open
2227      !!----------------------------------------------------------------------
2228      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
2229      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
2230      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
2231      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
2232      !!----------------------------------------------------------------------
2233      !
2234      WRITE (clios, '(I5.0)')   kios
2235      IF( kios < 0 ) THEN         
2236         CALL ctl_warn( 'end of record or file while reading namelist '   &
2237            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2238      ENDIF
2239      !
2240      IF( kios > 0 ) THEN
2241         CALL ctl_stop( 'misspelled variable in namelist '   &
2242            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2243      ENDIF
2244      kios = 0
2245      RETURN
2246      !
2247   END SUBROUTINE ctl_nam
2248
2249
2250   INTEGER FUNCTION get_unit()
2251      !!----------------------------------------------------------------------
2252      !!                  ***  FUNCTION  get_unit  ***
2253      !!
2254      !! ** Purpose :   return the index of an unused logical unit
2255      !!----------------------------------------------------------------------
2256      LOGICAL :: llopn
2257      !!----------------------------------------------------------------------
2258      !
2259      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2260      llopn = .TRUE.
2261      DO WHILE( (get_unit < 998) .AND. llopn )
2262         get_unit = get_unit + 1
2263         INQUIRE( unit = get_unit, opened = llopn )
2264      END DO
2265      IF( (get_unit == 999) .AND. llopn ) THEN
2266         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2267         get_unit = -1
2268      ENDIF
2269      !
2270   END FUNCTION get_unit
2271
2272   !!----------------------------------------------------------------------
2273END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.