
    ɯwgZ                        d dl Z d dlZd dlmZmZmZmZmZmZ d dl	Z	d dl
mZ d dlmc mZ d dlmZ d dlmZ ddlmZ 	 d dlmZ  e	j6                         rd Zn	 d d	lmZ 	 	 	 eee!   eee!      ejD                  eede!f   e#f   Z$	 d Z%dnde	jL                  de!de$de#fdZ'dnde	jL                  de#de$de#fdZ(	 dnde	jL                  de!de$de#fdZ)	 dnde	jL                  de!de$de#fdZ*	 dnde	jL                  de#de!de$de#f
dZ+	 dnde	jL                  de#de!de$de#f
dZ,	 dndee	jL                     de#de$de#dee	jL                     f
dZ-	 dndee	jL                     de$de#dee	jL                     fdZ.	 dndee	jL                     de#dee!   de$de#dee	jL                     fdZ/d  Z0	 dnde	jL                  d!eee!      d"eee!      de$de#de	jL                  fd#Z1	 dnde	jL                  d!eee!      d"eee!      de$de#de	jL                  fd$Z2	 dnde	jL                  d%ee!   de$de#de	jL                  f
d&Z3 G d' d(e	jL                        Z4	 dnde$de#dee#ee!   e!f   fd)Z5dnde$de#de#fd*Z6 G d+ d,e	jn                  jp                        Z9de:fd-Z;de	jL                  fd.Z<d/ Z=d0 Z>d1 Z?d2 Z@d3 ZAd4 ZBd5 ZCd6 ZDd7 ZEd8 ZFd9 ZGd: ZHd; ZId< ZJd= ZKd> ZLd? ZM e	j6                         sPe	j                  j                  d@dA      ZPePj                  dBe?dC       ePj                  dDeDdC       ePj                  dEeCdC       ePj                  dFeFdC       ePj                  dGe@dC       ePj                  dHeIdC       ePj                  dIeJdC       ePj                  dJeKdC       ePj                  dKeLdC       ePj                  dLeMdC       ePj                  dMeHdC       ePj                  dNe>dC       ePj                  dOeEdC       e	j                  j                  j                  e	j                  j                  jJ                  j                         e	j                  j                  j                  e	j                  j                  jJ                         e	j                  j                  dPdQ      ZXe	j                  j                  dPdA      ZYg dRZZe j                  e\   Z]eZD ]\  Z^e^d e^j                  dS       Z` eaedTe`       ZbeXj                  e^e	j                  j                  U       eYj                  e`ebdV       ^ n ej@                  dW       	 	 	 	 dodXe	jL                  dYe	jL                  dZe:de#de!f
d[Zf	 	 	 	 	 dpd]e	jL                  d^e	jL                  d_e#dZe:de!de#fd`Zgej                  j                  d\ej                  j                  daej                  j                  dbej                  j                  dcej                  j                  ddej                  j                  deej                  j                  dfej                  j                  dgiZq	 	 	 	 dqdhe	jL                  d_e#dZe:de#fdiZr	 	 	 	 	 drd]e	jL                  d^e	jL                  de#fdjZs	 	 	 dsdkee	jL                     dhe	jL                  de#fdlZtd dmlmuZvmwZxmyZzm{Z|m(Z}m1Z~m+Z e|efeege}ere~esezetexegevefiZy# e$ r
 d dlmZ Y w xY w# e$ r  ej@                  d
       d ZY w xY w)t    N)castListOptionalTupleTYPE_CHECKINGUnion)
DeviceMesh)get_proxy_mode   )_functional_collectives_impl)tree_map_onlyc                       y)z9Can't import torchdynamo in torchdeploy builds currently.F r       n/home/mcse/projects/flask/flask-venv/lib/python3.12/site-packages/torch/distributed/_functional_collectives.pyis_torchdynamo_compilingr      s    r   )is_dynamo_compilingzdUnable to import torchdynamo util `is_torchdynamo_compiling`, so won't support torchdynamo correctlyc                       y)NFr   r   r   r   r   r   #   s    r   zdist.tensor.DeviceMeshc                 T    t         j                  j                  j                  |       S )z
    Wait on a tensor returned by the collectives ops.

    Waiting follows device semantics, which means blocking on CPU and synchronizing streams on CUDA.
    )torchops_c10d_functionalwait_tensor)tensors    r   r   r      s     99%%11&99r   selfsrcgrouptagc                     t        ||      }t        j                  j                  j	                  | ||      }t        |      S )a  
    Broadcasts the tensor to all processes in the given process group.

    Args:
        src (int): Source rank
        group (ProcessGroup or List[int]): The process group to work on.
        tag (str, optional): A unique identifier for the collective. Default: empty string
    )_resolve_group_namer   r   r   	broadcast_maybe_wrap_tensor)r   r   r   r   
group_namer   s         r   r!   r!      s9     %UC0JYY''11$ZHFf%%r   reduceOpc                     t        ||      }t        j                  j                  j	                  | |j                         |      }t        |      S )a  
    Reduces the tensor data across all machines in such a way that all get
    the final result.

    The input tensor is left unmodified.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )r    r   r   r   
all_reducelowerr"   )r   r$   r   r   r#   r   s         r   r&   r&      s@    " %UC0JYY''2249I:VFf%%r   
gather_dimc                 |   | j                         sJ t        ||      }t        j                  |      }t        j
                  j                  j                  | ||      }t        |      }|dk7  rMt        |t              r|j                         }t	        j                  t	        j                  ||d      |      }|S )a%  
    Gather tensor data across from all machines and concatenate over ``gather_dim``.

    Note that it currently only supports gather_dim = 0.

    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   dim)is_contiguousr    c10d_get_group_size_by_namer   r   r   all_gather_into_tensorr"   
isinstanceAsyncCollectiveTensorwaitcatchunkr   r(   r   r   r#   
group_sizer   ress           r   all_gather_tensorr8      s    , $UC0J--j9JYY''>>j*F V
$CQ c01((*CiiC;LJr   c                 l   t        ||      }t        j                  |      }t        j                  j
                  j                  | ||      }t        j                  |      }|dk7  rMt        |t              r|j                         }t        j                  t        j                  ||d      |      }|S )a<  
    Gather tensor data across from all machines and concatenate over ``gather_dim``.

    Note that it currently only supports gather_dim = 0.

    This function is the same as all_gather_tensor but will propagate the
    backwards gradient across workers.

    See all_gather_tensor for more details on usage.
    r   r*   )r    r-   r.   r   r   _c10d_functional_autogradr/   _FromTorchTensorapplyr0   r1   r2   r3   r4   r5   s           r   all_gather_tensor_autogradr=      s      %UC0J--j9JYY00GGj*F 
 
 
(CQ c01((*CiiC;LJr   scatter_dimc                    t        ||      }t        j                  |      }| j                  |      |z  dk(  sJ d| j                  d       d|        |dk7  r-t	        j
                  | ||      }t	        j                  |      } t        j                  j                  j                  | |j                         ||      }t        |      }	|	S )a(  
    Reduces the tensor data across all machines in such a way that all get
    the final result, then scatter the results to corresponding ranks.


    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh
    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   input dimension 0 (" must be a multiple of group_size r*   )r    r-   r.   sizer   r4   r3   r   r   reduce_scatter_tensorr'   r"   
r   r$   r>   r   r   r#   r6   tensor_listr   r7   s
             r   rC   rC      s    , %UC0J--j9J 			++q0Z	TYYq\N*LZLYZ0akk$
Dyy%YY''==	F V
$CJr   c                    t        ||      }t        j                  |      }| j                  |      |z  dk(  sJ d| j                  d       d|        |dk7  r-t	        j
                  | ||      }t	        j                  |      } t        j                  j                  j                  | |j                         ||      }t        j                  |      }	|	S )a|  
    Reduces the tensor data across all machines in such a way that all get
    the final result, then scatter the results to corresponding ranks.

    This function is the same as reduce_scatter_tensor but will propagate the
    backwards gradient across workers.

    Currently only the "sum" reduceOp is supported.

    See reduce_scatter_tensor for more details on usage.
    r   r@   rA   r*   )r    r-   r.   rB   r   r4   r3   r   r:   rC   r'   r;   r<   rD   s
             r   reduce_scatter_tensor_autogradrG   %  s    & %UC0J--j9J 			++q0Z	TYYq\N*LZLYZ0akk$
Dyy%YY00FF	F 
 
 
(CJr   returnc                     t        ||      }t        j                  j                  j	                  | |j                         |      }t        t        t        |            S )a  
    Reduces a list of tensors across all machines in such a way that all get
    the final result.

    The all tensors in the input list are left unmodified.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )	r    r   r   r   all_reduce_coalescedr'   listmapr"   )r   r$   r   r   r#   rE   s         r   rJ   rJ   L  sO    & %UC0J)),,AAK
 &455r   c                     t        ||      }t        j                  |      }t        j                  j
                  j                  | ||      }t        t        t        |            S )a  
    Gather a list of tensors across from all machines.

    Note that it currently only supports gather_dim = 0.

    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )
r    r-   r.   r   r   r    all_gather_into_tensor_coalescedrK   rL   r"   )r   r   r   r#   r6   rE   s         r   rN   rN   h  sX    & %UC0J--j9J)),,MMK
 &455r   inputsc           
      :   t        ||      }t        j                  |      }t        |      t        |       k(  sJ t	        t        ||             D ]w  \  }\  }}	|	j                  |      |z  dk(  s"J d| d|	j                  |       d| d|        |dk7  sHt        j                  |	||      }
t        j                  |
      | |<   y t        j                  j                  j                  | |j                         ||      }
t        t        t         |
            S )a,  
    Reduces a list of tensors across all machines in such a way that all get
    the final result, then scatter the results to corresponding ranks.

    The input tensors are left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   zinput dimension z (rA   z for tensor at index r*   )r    r-   r.   len	enumerateziprB   r   r4   r3   r   r   reduce_scatter_tensor_coalescedr'   rK   rL   r"   )rO   r$   r>   r   r   r#   r6   idxr+   r   rE   s              r   rT   rT     s   , %UC0J--j9J{s6{***'K(@A 1]c6KKz)Q.	@cU"V[[%5$66XYcXddyz}y~	@.!8++fjcBK))K0F3K1 )),,LL	K &455r   c                    t        | t        j                  j                        sJ | j                  }t        |j                        dkD  r6|j                  d   }|j                  d uxr |j                  j                   S y Nr   )	r0   r   _ops
OpOverload_schemarQ   	arguments
alias_infois_write)tgtschema	first_args      r   _is_view_opra     sr    c5::00111[[F
6q $$Q'	##4/U	8L8L8U8U4UU !r   output_split_sizesinput_split_sizesc                 l   |t        d |D              sJ |       |t        d |D              sJ |       t        ||      }t        j                  |      }||#||J d       | j                  d   |z  g|z  }|}t
        j                  j                  j                  | |||      }t        |      S )aC  
    Each process splits input tensor and then scatters the split list
    to all processes in a group. Then concatenate the received tensors from all
    the processes in the group and return single output tensor.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    c              3   \   K   | ]$  }t        |t        t        j                  f       & y wNr0   intr   SymInt.0rB   s     r   	<genexpr>z$all_to_all_single.<locals>.<genexpr>  %      
6:Jtc5<<01
   *,c              3   \   K   | ]$  }t        |t        t        j                  f       & y wrf   rg   rj   s     r   rl   z$all_to_all_single.<locals>.<genexpr>  rm   rn   ^output_split_sizes and input_split_sizes must either be specified together or both set to Noner   )
allr    r-   r.   shaper   r   r   all_to_all_singler"   r   rb   rc   r   r   r#   r6   r   s           r   rs   rs     s    , % 
>P
 
 		 
 $ 
>O
 
 		 
 %UC0J--j9J!%6%>!).?.G 	
5	
G #jjmz9:ZG.YY''99	F f%%r   c                    |t        d |D              sJ |       |t        d |D              sJ |       t        ||      }t        j                  |      }||#||J d       | j                  d   |z  g|z  }|}t
        j                  j                  j                  | |||      }t        j                  |      S )z:
    Same as all_to_all_single but supports autograd.
    c              3   \   K   | ]$  }t        |t        t        j                  f       & y wrf   rg   rj   s     r   rl   z-all_to_all_single_autograd.<locals>.<genexpr>  rm   rn   c              3   \   K   | ]$  }t        |t        t        j                  f       & y wrf   rg   rj   s     r   rl   z-all_to_all_single_autograd.<locals>.<genexpr>  rm   rn   rp   r   )rq   r    r-   r.   rr   r   r   r:   rs   r;   r<   rt   s           r   all_to_all_single_autogradrx     s    % 
>P
 
 		 
 $ 
>O
 
 		 
 %UC0J--j9J!%6%>!).?.G 	
5	
G #jjmz9:ZG.YY00BB	F !!&))r   src_dstc                 Z   t        ||      \  }}}t        j                  |||      }dg|z  }dg|z  }	t        |      D ]\  \  }
}|
t	        j
                  |      k(  r| j                         |	|<   |t	        j
                  |      k(  sJ| j                         ||
<   ^ t        | ||	||      S )a"  
    Permutes the elements of the tensor according to the given source/destination pairs. `src_dst` should
    be defined such that src_dst[m] == n means m sends to n.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one
    r   )_expand_groupr-   #_find_or_create_pg_by_ranks_and_tagrR   distget_ranknumelrs   )r   ry   r   r   tranksetr6   local_pgrb   rc   r   dsts               r   permute_tensorr     s    " +5#6Aw
777JOHz)j(g& 3S$--))%)ZZ\c"$--))&*jjls#	3 T#57H%QTUUr   c                       e Zd ZU dZej
                  ed<   eed<   ddgZe	dej
                  fd       Z
d Zd Ze	d        Zd Zd	 Zd
ej
                  fdZd Zedd       Zd Zy)r1   a  
    A Tensor wrapper subclass that is used to trigger a call to wait
    prior to first use of the underlying tensor.
    Use it inside functional collective pytorch wrappers like the following:
    def functional_collective(self, group, tag):
        tag, rankset, group_size = _expand_group(group, tag)
        tensor = torch.ops.c10d_functional.{collective}(self, tag, rankset, group_size)
        return _maybe_wrap_tensor(tensor)
    elem	completedc           
         t         j                  j                  | |j                         |j	                         |j                         |j                  |j                  |j                  |j                        }||_
        d|_        |S )N)stridesstorage_offsetdtypelayoutdevicerequires_gradF)r   Tensor_make_wrapper_subclassrB   strider   r   r   r   r   r   r   )clsr   rs      r   __new__zAsyncCollectiveTensor.__new__?  so    LL//IIKKKM..0**;;;;,, 0 	
 r   c                     dgd fS Nr   r   r   s    r   __tensor_flatten__z(AsyncCollectiveTensor.__tensor_flatten__O  s    x~r   c                 >    | j                         j                         S rf   )trigger_waittolistr   s    r   r   zAsyncCollectiveTensor.tolistR  s      "))++r   c                 *    |J | d   }t        |      S r   )r1   )inner_tensorsmeta
outer_sizeouter_strider   s        r   __tensor_unflatten__z*AsyncCollectiveTensor.__tensor_unflatten__U  s!    ||V$$T**r   c                 *    d| j                          dS )NzAsyncCollectiveTensor())r   r   s    r   __repr__zAsyncCollectiveTensor.__repr__[  s    '(9(9(;'<A>>r   c                 n    | j                   st        | j                        }d| _         |S | j                  S NT)r   r   r   )r   outs     r   r   z"AsyncCollectiveTensor.trigger_wait^  s-    ~~dii(C!DNJ99r   rH   c                 ,    t        | j                        S rf   )r   r   r   s    r   r2   zAsyncCollectiveTensor.waitf  s    499%%r   c                     | j                   S )zOThis method enables  _functional_collectives_impl to test if a tensor is an ACS)r   r   s    r   _get_acs_underlying_tensorz0AsyncCollectiveTensor._get_acs_underlying_tensori  s    yyr   Nc                    |t         j                  j                  j                  j                  k(  r& ||d   j
                  |d         }t        |      }|S t        |      dt        ffd}dt         j                  fd}t        t        ||      }	t        t        ||      }
 ||	i |
}rt        t         j                  ||      }|S )Nr   r   ec                 @    s| j                         S | j                  S rf   )r   r   )r   
is_view_ops    r   unwrapz8AsyncCollectiveTensor.__torch_dispatch__.<locals>.unwrapx  s    ~~''66Mr   c                 @    t        | t              rJ t        |       }|S rf   )r0   r1   )r   r7   s     r   wrapz6AsyncCollectiveTensor.__torch_dispatch__.<locals>.wrap~  s"    !!%:;;;'*CJr   )
r   r   atenviewdefaultr   r1   ra   r   r   )r   functypesargskwargsr7   wrapper_resr   r   unwrapped_argsunwrapped_kwargsr   r   s               @r   __torch_dispatch__z(AsyncCollectiveTensor.__torch_dispatch__m  s    599>>&&... tAw||T!W-C/4K &
	+ 		ELL 	 ''<fdK()>O N7&67 dC8C
r   c                 >    | j                         j                         S rf   )r2   numpyr   s    r   r   zAsyncCollectiveTensor.numpy  s    yy{  ""r   )r   N)__name__
__module____qualname____doc__r   r   __annotations__bool	__slots__staticmethodr   r   r   r   r   r   r2   r   classmethodr   r   r   r   r   r1   r1   0  s     ,,O%I5<<  , + +
?&ell &    D#r   r1   c           	         t         rd }d }nd }d }t        | t              rt        | d   t              r^ ||       }g }d}|D ]K  }|j                  |       |dk7  r(|t	        |      k7  rt        d| dt	        |             t	        |      }M n1 ||       }t	        |      }nt        | t        j                        r:t        j                  |       }t	        |      }|xs t        j                  |       }nt        | t              r5| j                  d	k(  sJ d
       | j                  d   \  }}}t	        |      }nt        | t              rht	        |       dk(  rOt        | d   t              r<t        | d	   t              r)| d   }	| d	   }
|	j                  |
   \  }}}t	        |      }nt        d      t        d      |||fS )a5  
    _expand_group desugars the different RANK_TYPES types into a canonical format that is traceable.

    By having this be part of the explicit eager codepath, we avoid having to specialize behavior inside
    torchdynamo and can still interoperate with processgroup objects or other untraceable forms.
    c                 >    t        t        t        t              |       S rf   r   r   rh   xs    r   cast_listlistintz'_expand_group.<locals>.cast_listlistint  s    T#Y++r   c                 0    t        t        t           |       S rf   r   r   s    r   cast_listintz#_expand_group.<locals>.cast_listint  s    S	1%%r   c                     | S rf   r   r   s    r   r   z'_expand_group.<locals>.cast_listlistint      Hr   c                     | S rf   r   r   s    r   r   z#_expand_group.<locals>.cast_listint  r   r   r   z$group sizes must be identical found z and r   JOnly 1D mesh is supported, pass in (DeviceMesh, int) together if mesh > 1D   1Invalid tuple for group must be (DeviceMesh, int)z[Invalid type for group, must be one of List, Processgroup, DeviceMesh or (DeviceMesh, int).)r   r0   rK   extendrQ   
ValueErrorr}   ProcessGroupget_process_group_ranksr-   _get_group_tagr	   ndim_dim_group_infostuplerh   )r   r   r   r   nested_listr   r6   rs_dmeshr+   s              r   r{   r{     s    	,	&		 %eAh%*51KGJ! %r"#
c"g(=$>zl%PSTVPWyY  !W
% #5)GWJ	E4,,	-..u5\
/T((/	E:	&JJ!O	XW	X  003Wa\
	E5	!J!O58Z058S)!HE(C#44S9OC!WJPQQi
 	
 *%%r   c                    t        | t        j                        r| j                  S t        | t              r| S t        | t
              r(| j                  dk(  sJ d       | j                  d   d   S t        | t              r[t        |       dk(  rBt        | d   t
              r/t        | d   t              r| d   }| d   }|j                  |   d   S t        d      t        | t              rQt               st        j                  dt         d       t#        j$                  t'        t(        t           |       |      S t        d	t+        |        d
|        )z;
    Given group in RANK_TYPES, return the group name.
    r   r   r   r   r   zThe combination of ranks + tag as process group identifier has been deprecated. Please switch to using ProcessGroup, DeviceMesh, or group name instead.   )
stacklevelzUnsupported group type: z, )r0   r}   r   r#   strr	   r   r   r   rQ   rh   r   rK   r   warningswarnFutureWarningr-   $_resolve_group_name_by_ranks_and_tagr   r   type)r   r   r   r+   s       r   r    r      s9    %**+	E3		E:	&JJ!O	XW	X%%a(++	E5	!J!O58Z058S)!HE(C))#.q11PQQ	E4	 ')MMI  88d3i9OQTUU3DK=5'JKKr   c                       e Zd ZdZedej                  dej                  fd       Zedej                  dej                  fd       Zy)r;   zm
    _FromTorchTensor allows autograd to propagate from a normal Tensor to an
    AsyncCollectiveTensor.
    inputrH   c                     t        |      S rf   )r"   )ctxr   s     r   forwardz_FromTorchTensor.forward  s    
 "%((r   grad_outputc                     |S rf   r   )r   r   s     r   backwardz_FromTorchTensor.backward  s    r   N)	r   r   r   r   r   r   r   r   r   r   r   r   r;   r;     s_    
 )||) 
) ) 5<< ELL  r   r;   c                      t               ryt        j                  j                  t        j                  j                  j
                        	 yt               d uS r   )r   r   _C_get_dispatch_mode_TorchDispatchModeKey
FUNCTIONALr
   r   r   r   _are_we_tracingr     sJ    !
 	##EHH$B$B$M$MN	 4''r   c                 v    t               rt        |       S t        |       }t        t        j
                  |      S rf   )r   r   r1   r   r   r   )r   r7   s     r   r"   r"   ,  s.    4  

%Cc""r   c                 D    fd}| D cg c]
  } ||       c}S c c}w )Nc                 v    t        | j                               }|dxx   z  cc<   | j                  |      }|S rW   rK   rB   	new_empty)shardout_size
out_tensorr6   s      r   mk_out_tensorz=_all_gather_into_tensor_coalesced_meta.<locals>.mk_out_tensor4  s5    

%z!__X.
r   r   )r   r   r   r6   r  r   s      `  r   &_all_gather_into_tensor_coalesced_metar  3  s#     '++M!+++   c                 ,    t        j                  |       S rf   r   
empty_liker   r   s     r   _broadcast_metar	  >      D!!r   c                 ,    t        j                  |       S rf   r  r  s     r   _all_reduce_metar  B  r
  r   c                 ,    t        j                  |       S rf   r  r  s     r   _wait_tensor_metar  F  r
  r   c                 p    t        | j                               }|dxx   |z  cc<   | j                  |      S rW   r   )r   r   r   r6   r   s        r   _all_gather_into_tensor_metar  J  s/    EJJL!HQK:K??8$$r   c                 p    t        | j                               }|dxx   |z  cc<   | j                  |      S rW   r   )r   	reduce_opr   r   r6   r   s         r   _reduce_scatter_tensor_metar  P  s/    EJJL!HQKJK??8$$r   c                 R    | D cg c]  }t        j                  |       c}S c c}w rf   r  )r   r   r   s      r   _all_reduce_coalesced_metar  V  s!    )-.AEQ...s   $c                     | S rf   r   inpr   s     r   _all_reduce__metar  Z      Jr   c                     | S rf   r   r  s     r   _broadcast__metar  ^  r  r   c                     | S rf   r   )rO   r   s     r   _all_reduce_coalesced__metar  b  s    Mr   c                 D    fd}| D cg c]
  } ||       c}S c c}w )Nc                 v    t        | j                               }|dxx   z  cc<   | j                  |      }|S rW   r   )r   r   r  r6   s      r   r  z<_reduce_scatter_tensor_coalesced_meta.<locals>.mk_out_tensorg  s5    

%
"__X.
r   r   )rO   r$   r   r   r6   r  r   s       `  r   %_reduce_scatter_tensor_coalesced_metar!  f  s#     '--M!---r  c                     || j                  | j                               S |D ]  }t        j                  |        t	        | j                               }t        |      |d<   | j                  |      S rW   )r   rB   r   _check_is_sizerK   sum)r   rb   rc   r   r   sr   s          r   _all_to_all_single_metar&  u  sj     !uzz|,,# 	$A  #	$

%,-x((r   c                p    t        | j                               }|dxx   |z  cc<   | j                  |      S rW   r   )r   r6   r#   r   rr   s        r   '_all_gather_into_tensor_out_native_metar(    /    E	!H
H??5!!r   c                 p    t        | j                               }|dxx   |z  cc<   | j                  |      S rW   r   )r   r6   r#   rr   s       r   #_all_gather_into_tensor_native_metar+    r)  r   c                 B    | D cg c]  }t        |||       c}S c c}w rf   )r+  )rO   r6   r#   r   s       r   -_all_gather_into_tensor_coalesced_native_metar-    s-      	,E:zJ  s   c                 p    t        | j                               }|dxx   |z  cc<   | j                  |      S rW   r   )r  r  r6   r#   rr   s        r   "_reduce_scatter_tensor_native_metar/    s/    E	!HH==r   c           	      D    | D cg c]  }t        ||||       c}S c c}w rf   )r/  )rO   r  r6   r#   r  s        r   ,_reduce_scatter_tensor_coalesced_native_metar1    s/    
  	+3	:zR  s   r   IMPLr&   Metaall_reduce_rJ   all_reduce_coalesced_r   all_gather_into_tensor_outr/   rN   rC   rT   rs   r!   
broadcast_c10d_functionalDEF)	zObroadcast(Tensor self, int src, str tag, int[] ranks, int group_size) -> TensorzUall_reduce(Tensor self, str reduceOp, str tag, int[] ranks, int group_size) -> Tensorzcall_reduce_coalesced(Tensor[] self, str reduceOp, str tag, int[] ranks, int group_size) -> Tensor[]z"wait_tensor(Tensor self) -> TensorzTall_gather_into_tensor(Tensor shard, str tag, int[] ranks, int group_size) -> Tensorzball_gather_into_tensor_coalesced(Tensor[] input, str tag, int[] ranks, int group_size) -> Tensor[]zareduce_scatter_tensor(Tensor input, str reduceOp, str tag, int[] ranks, int group_size) -> Tensorzpreduce_scatter_tensor_coalesced(Tensor[] inputs, str reduceOp, str tag, int[] ranks, int group_size) -> Tensor[]zall_to_all_single(Tensor input, SymInt[]? output_split_sizes, SymInt[]? input_split_sizes, str tag, int[] ranks, int group_size) -> Tensor(r   )tagsCompositeImplicitAutogradzJPyTorch Distributed functional collectives do not work with torch::deploy.output_tensorinput_tensorasync_opc                     |rJ d       |xs t         j                  j                  }|J | j                  t	        ||||            S N@Can't remap async version of inplace op to functional collective)r}   r   WORLDcopy_r8   )r=  r>  r   r?  r   r(   s         r   all_gather_tensor_inplacerE    sY     JIJ %TZZ%%E0z5RUVWWr   r$  outputr   opc           	          |rJ d       |xs t         j                  j                  }|J | j                  t	        |||||            S rA  )r}   r   rC  rD  rC   )rF  r   rG  r   r?  r>   r   s          r   reduce_scatter_tensor_inplacerI    sX     JIJ %TZZ%%E<<-eReSQRRr   avgproductminmaxbandborbxorr   c                     |rJ d       |xs t         j                  j                  }|J | j                  t	        | |||            S rA  )r}   r   rC  rD  r&   )r   rG  r   r?  r   s        r   all_reduce_inplacerR    sU     JIJ %TZZ%%E<<
62uc:;;r   c           	          |rJ d       |xs t         j                  j                  }|J | j                  t	        |||||            S rA  )r}   r   rC  rD  rs   )rF  r   rb   rc   r   r?  r   s          r   all_to_all_inplacerT  /  sc     JIJ %TZZ%%E<<	
 r   rE   c           	         |rJ d       t        fd| D              sJ d       |xs t        j                  j                  }|J t	        d||      }g }d}| D ]<  }|j                  ||||j                  d      z           ||j                  d      z  }> t        | |      D ]  \  }	}
|	j                  |
        | S )NrB  c              3   d   K   | ]'  }|j                  d       j                  d       k(   ) yw)r   N)rB   )rk   r   r   s     r   rl   z%all_gather_inplace.<locals>.<genexpr>T  s+      ()q	V[[^#s   -0z7Remapping variable size all_gather is not yet supportedr   )	rq   r}   r   rC  r8   appendrB   rS   rD  )rE   r   r   r?  r   rF  output_splitsoffsetr   r   r   s    `         r   all_gather_inplacerZ  J  s     JIJ -8  A@A  %TZZ%%Evq%5F MF VFVaffQi-?@A!&&) ]3 S		#r   )_all_gather_base_reduce_scatter_base
all_gatherr/   r&   rs   rC   ) )Fr^  r   )r$  NFr   r^  )r$  NFr^  )NNNFr^  )NFr^  )sysr   typingr   r   r   r   r   r   r   torch.distributeddistributedr}   "torch.distributed.distributed_c10ddistributed_c10dr-   torch.distributed.device_meshr	   "torch.fx.experimental.proxy_tensorr
   r^  r   fun_col_impltorch.utils._cxx_pytreer   ImportErrortorch.utils._pytree_running_with_deployr   torch.compilerr   	Exceptionr   rh   r   r   
RANK_TYPESr   r   r!   r&   r8   r=   rC   rG   rJ   rN   rT   ra   rs   rx   r   r1   r{   r    autogradFunctionr;   r   r   r"   r  r	  r  r  r  r  r  r  r  r  r!  r&  r(  r+  r-  r/  r1  libraryLibrarylib_implimplfxnodehas_side_effectr   r   r   
legacy_liblegacy_lib_implops_defsmodulesr   	my_moduleop_defindexop_namegetattrbackend_impldefineTagpt2_compliant_tagrE  rI  ReduceOpSUMAVGPRODUCTMINMAXBANDBORBXORREDUCE_OP_TO_STRrR  rT  rZ  r[  legacy_all_gather_baser\  legacy_reduce_scatter_baser]  legacy_all_gatherr/   legacy_allgatherlegacy_allreducelegacy_all_to_all_singlelegacy_reducescattertraceable_collective_remapsr   r   r   <module>r     s/	   
  D D    1 1 4 = :25
 5
R#J
 IcO	
"C
'(	
::&ELL &s &: &C &&U\\ &S & &# &4 	$
,,$$ $ 
	$V 	
,,  
	L '
,,'' ' 	'
 
'^ $
,,$$ $ 	$
 
$P LN6
u||
6(+64>6EH6	%,,6: =?6
u||
6%/6696	%,,6D )6)6)6 c)6 	)6
 
)6 
%,,)6\V -&
,,-& c+-&  S	*-& 	-&
 
-& \\-&j "*
,,"* c+"*  S	*"* 	"*
 
"* \\"*R 	V
,,V#YV V 
	V
 \\V>a#ELL a#H
H& H&# H&uS$s)S=P7Q H&V%Lz %L %LS %LPu~~.. $( (# #,"""%%/.
)""  "u!!# }}$$%7@HMM, 0&9MM-!2F;MM(*DfMMM)+FOMM-!2F;MM$&Mv MM*,OQWXMM*5
 MM)+MvVMM)4
 MM%'>GMM+7MM, 0&9 
HHMM!!%))"<"<"H"H"P"PQ	HHMM!!%))"<"<"H"HI &&'8%@Jmm++,=vFO
H H%I QV\\#./|q	];&uyy'B'BCWl4OP	Q HMMT
 X<<X,,X 	X
 
X X* 
SLLS<<S 	S
 S S 
S( 	MMuMMuMM9MMuMMuMMMMuMM	  
<LL<< 	<
 
<( 
LL<< 
< ell#LL
 
>   /7(0) =5 K#  2112  r	
	s#   [ [ [[[76[7