
    ɯwgT\                    r   d dl mZ d dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZ d dlZddlmZmZ ddlmZ ddlmZmZmZmZmZmZmZ ej4                  j7                  ed	      Zerdd
lmZ ddZ ddZ!	 	 	 	 ddZ"	 	 	 	 	 	 	 	 	 	 ddZ#	 	 	 	 ddZ$ddZ%d Z&d Z'	 	 	 	 ddZ(ddZ)d Z*	 	 	 	 	 	 	 	 ddZ+y)    )annotationsN)defaultdict)DictListSetTYPE_CHECKING   )configir)WeakDep)contains_collectivecontains_waitfind_recursive_deps_of_nodefind_recursive_users_of_nodeis_collectiveis_fallback_opis_waitoverlap)BaseSchedulerNodec                     t        | ddd      S )z7
    Greedily schedules waits as late as possible.
    FTraise_comms
sink_waitsreorder_for_overlap_schedule_for_commsnodess    Z/home/mcse/projects/flask/flask-venv/lib/python3.12/site-packages/torch/_inductor/comms.pyr   r       s     Ed     c                     t        | ddd      S )z8
    Greedily schedules comms as early as possible.
    TFr   r   r   s    r   r   r   )   s     DU r    c                     t        | ddd      S )a  
    This achieves the following overall scheduling procedure:
        Step 1: Given that we've currently scheduled comm N, we now schedule all compute nodes
            that are required for comm N + 1 but do not depend on comm N, to run at the same time with comm N.
        Step 2: If all those compute nodes are sufficient to overlap comm N, we're done.
            Otherwise, we now need to look elsewhere to find compute that overlaps with comm N.
            We prioritize compute nodes that are needed sooner.
        Step 3: We schedule the compute nodes dependent on comm N and required for comm N + 1.
        Step 4: We schedule comm N + 1.
        Repeat this for subsequent comm nodes.
    Tr   r   r   s    r   reorder_compute_for_overlapr#   2   s     DTt r    c           	     @   i }i i i i ct        |       D ]y  \  }}|j                         D ]  }|||<   	 |j                         D ]  }||<   	 ||j                         <   |j                         }	t        j
                  |	<   d|	<   ||	<   { d}
| D ]  }|rZt        |      rO|
|j                         <   |j                  D ]'  }|   j                         }t        |   |
      |<   ) |
dz  }
_|sbt        |      snd|j                         <     G fdd      | D ci c]'  }||j                  D ch c]  }|j                   c}) c}}g t        t              | D ci c]  }|t        |       c}j                         D ]J  \  }}t!        |      dk(  rt#        j$                   |             |D ]  }|   j'                  |        L g fdfdfd}t!              rIt#        j(                        j*                  }|rt        |      r	 ||       n |       t!              rIj                         D ]  \  }}t!        |      dk(  rJ d         S c c}w c c}}w c c}w )	a  
    Schedule `snodes` for various comm optimization objectives.

    Args:
        snodes: the nodes to be scheduled.
        raise_comms: whether to greedily schedule collectives as early as possible
        sink_wait: whether to greedily schedule waits as late as possible
        reorder_compute_for_overlap: whether to reorder compute nodes to
            optimize for compute/communication overlapping.

    Returns:
        The new schedule order.

    Some notes on the synergy between different options:
        - `raise_comms` provides more overlapping oppurtunies for `reorder_compute_for_overlap`.
        - When both `raise_comms` and `sink_waits` is `True`, `raise_comms` is prioritized.
    r   r	   c                  &    e Zd Zd fdZd Zy)$_schedule_for_comm.<locals>.Runnablec                    || _         t        t        |j                                     }|   j	                         }|   |   |   f| _        y N)snodenextiterget_operation_namesget_namescore)selfr)   name
fused_namename_to_fused_nodescores_0scores_1scores_2s       r   __init__z-_schedule_for_comm.<locals>.Runnable.__init__   sV    DJU6689:D+D1::<J$$$DJr    c                4    | j                   |j                   k  S r(   r.   )r/   others     r   __lt__z+_schedule_for_comm.<locals>.Runnable.__lt__   s    ::++r    N)returnNone)__name__
__module____qualname__r6   r:   )r2   r3   r4   r5   s   r   Runnabler&      s    	 		,r    r@   c                    j                  |        | j                         D ]N  }|   D ]D  } |    j                  |       t        |          dk(  s)t	        j
                   |              F P y)zU
        Schedules `snode` and put all unblocked nodes onto the ready queue.
        r   N)appendget_buffer_namesremovelenheapqheappush)r)   buf_namer@   buffer_usersready	scheduled
unmet_depss     r   schedulez$_schedule_for_comm.<locals>.schedule   sv     	..0 	;H%h/ ;5!((2z%()Q.NN5(5/:;	;r    c                     D  cg c].  } t        | j                        st        | j                        s| 0 }} t        |      dk(  ryt	        |d       S c c} w )zh
        Return the next node in the ready queue that's neither a collective or
        a wait.
        r   Nc                    | j                   S r(   r8   xs    r   <lambda>zG_schedule_for_comm.<locals>.get_overlapping_candidate.<locals>.<lambda>   s
    QWW r    key)r   r)   r   rE   min)rQ   
candidatesrJ   s     r   get_overlapping_candidatez5_schedule_for_comm.<locals>.get_overlapping_candidate   s]     
&qww/agg8N 

 

 z?a:#455
s   3Ac                   t        |       sJ  |        |    }|dkD  rM        x}Dj                  |        |j                         ||j                     z  }|dkD  r
        x}Dt        j                         y)z
        Schedules collective node `snode`, along with one or more compute nodes
        to overlap with it. The strategy is described in the comment of
        `reorder_compute_for_overlap`.
        r   N)r   rD   r)   rF   heapify)r)   collective_cost	candidaterW   rJ   rM   snode_to_costs      r   schedule_collective_for_overlapz;_schedule_for_comm.<locals>.schedule_collective_for_overlap   s     #5)))'.a799FLL#Y__%}Y__==O a799F
 	er    z;Detected unscheduled nodes. Nodes with unmet dependencies: )	enumeraterC   r,   r-   sysmaxsizer   	ancestorsrU   r   unmet_dependenciesr0   r   setestimate_op_runtimeitemsrE   rF   rG   addheappopr)   )r   r   r   r   buf_name_to_snodeidxr)   rH   op_name	node_namecomm_idxancanc_fused_namedepdepsr]   r@   rI   rW   r2   rJ   rM   rK   r3   r4   r5   r\   rL   s                   @@@@@@@@@@@@r   r   r   E   s   L #%r2 Hh' "
U..0 	0H*/h'	0 002 	0G*/w'	0/45>>+,NN$	!kk!" H +.u5)1HU^^%& S!3C!8!A!A!C+.x/G+R(S MHM%0)*HU^^%&+, , LR5BGE$<$<=S==5J E6A#6FLDJK5U/66KM!'') )tt9>NN5(5/2 	)C!!%(	)) I	; 	;6& e*e$**#6u#=+E2UO e* "'') 
t4yA~ 	
..8\;	
~

 O >5 Ls   :JJ!J?JJc                v   t        d | D              rt        | ||      } | D cg c]  }t        |      s| }}t        dt	        |            D ]a  }t        t        ||   j                                     }||dz
     j                         D ]!  }||   j                  t        ||             # c | S c c}w )z
    Decide global ordering of comms, by just enforcing the ordering that's in the input graph
    (might not be the same ordering as the eager mode program).
    TODO: Come up with a better approach
    c              3     K   | ]t  }t        |j                  t        j                  j                  j
                  j                  t        j                  j                  j                  j                  h       v y wr(   )r   nodetorchopsfsdpall_gather_copy_indefault	chunk_cat).0rQ   s     r   	<genexpr>z2decide_global_ordering_of_comms.<locals>.<genexpr>   sY      	  	FF		1199		((00	
	s   A:A<r	   mutating_buf)
anyenforce_comm_ordering_for_fsdpr   rangerE   r*   r+   rC   add_fake_depr   )nodesname_to_bufr2   n
comm_nodesir}   bufs           r   decide_global_ordering_of_commsr      s      	 	 	 /ukCUV"=&9!&<!=J=1c*o& PDA!?!?!ABCa!e$557 	PCqM&&ws'NO	PP L >s
   B6B6c                    t         j                  dk(  r| j                         }|S t        t         j                        sJ t        j                  |       }|S )z:
    Returns estimated op runtime in nanoseconds (ns)
    rx   )r
   rd   get_estimated_runtimecallable)r)   runtimes     r   rd   rd      sR     !!Y.--/ N 22333,,U3Nr    c                l   d}t        | j                  t        j                        rd| j                  j                   d}d}t        | j                  d      rt        | j                  j                  d      ret        | j                  j                  d      rEd| j                  j                  j                   d| j                  j                  j                   d}d}t        | j                  d	      r| j                  j                  }| j                  j                  j                   | | d| dS )
N z ()layoutsizestridez (size=z	, stride=r0   )
isinstancers   r   ExternKernelOutpython_kernel_namehasattrr   r   r   r0   	__class__r=   )r)   detailout_tensor_infork   s       r   node_summaryr     s    F%**b001ejj334A6O

H%EJJ%%v.EJJ%%x0 ejj'',,-Yuzz7H7H7O7O6PPQR 	 Iuzz6"JJOO	jj""++,VH_4ER	{RSTTr    c                   d}d }| D ]  }|tt        |      r|t        |      z  }|j                  }n.t        |j                        rt	        d      |t        |      z  }t
        j                  t        |              yt        |      rt	        d      t        |j                        r"t
        j                  t        |              d }t
        j                  dt        |               t
        j                  d|dz  dz          y )Ng        z8Wait is not expected when there is no collective runningzkFound two collectives running at the same time. `visualize_overlap` needs to be updated to handle this casez| zEst. runtime (ms): i  )r   rd   rs   r   AssertionErroroverlap_logdebugr   )ordertotal_est_runtimecur_comm_noder)   s       r   visualize_overlapr     s   "M > "5)!%8%??! %

$$N  "%8%??!e!4 57"5)$R  $!!\%%8$9; $!!B|E':&;"<=->. 
/$6=>?r    c                   | }t         j                  D ]  }t        |t              r|t	               v rt	               |   }t
        j                  j                         dk(  r%t        j                  d| d       	 t        |        ||      }t
        j                  j                         dk(  st        j                  d| d       	 t        |        |S # t        $ r(}t        j                  t        |             Y d }~d }~ww xY w# t        $ r)}t        j                  t        |             Y d }~&d }~ww xY w)Nr   z.==== Visualize overlap before reordering pass z ====z-==== Visualize overlap after reordering pass )r
   'reorder_for_compute_comm_overlap_passesr   strglobalsrt   distributedget_rankr   r   r   	Exception)r   r   pes       r   $reorder_compute_and_comm_for_overlapr   =  s    E;; *a!wy.	!A%%'1,@5I*!%( %%%'1,?s%H*!%(#*( L  *!!#a&))*  *!!#a&))*s0   :C	D	D	!DD		D>D99D>c                  	 	 dd l 		j                  j                         sJ 	j                  j                  j
                  r 	j                  j                  j                  sJ 	 ddl
m}m}m}m}m} 	 	fd} |       } | |	j                  j                  j
                  j                    |t"        j$                   |	j                  j&                  j(                  j                    |d       |d       |d       |d       |d	       |d
       |d             |d             |d       |d            |d       d	fd       } ||        |j+                  |        y # t        t        t        f$ r Y y w xY w)Nr   r	   )CallFunction
KeywordArgMatchPatternMatcherPassregister_graph_patternc                J   t        | j                        }|D ]  }|j                  t        j                  k(  s!|j
                  d   j                  j                  j                  j                  j                  u se|j
                  d   dk(  sx| j                  |        y )Nr   r	   )listr   targetoperatorgetitemargsru   rv   rw   rx   
erase_node)g	node_listr   rt   s      r   remove_unused_getitemz8reinplace_fsdp_all_gather.<locals>.remove_unused_getitem~  su    M	 	 AH,,,FF1I$$		(I(I(Q(QQFF1INQ	 r    all_gather_inputsinp_split_sizesall_gather_input_numel
world_sizerankdtypedeviceitem_idx
group_size
group_namec                &    | j                   d   dk(  S )Nr   r   )kwargs)matchs    r   rR   z+reinplace_fsdp_all_gather.<locals>.<lambda>  s    %,,z":a"? r    )	pass_dictextra_checkc                |    fd}| j                  ||d   |d   |d   |d   |d   |d   |d   |d	   |d
   g	       y )Nc                     | d d }| d   }| d   } j                   j                  j                  j                  | }|d   }|d   }j                   j                  j
                  j                  ||||      }|S )Nr   r	   )out)ru   rv   rw   rx   _c10d_functionalall_gather_into_tensor_out)	r   copy_in_argsr   r   rw   r   	getitem_1all_gather_into_tensorrt   s	           r   replzEreinplace_fsdp_all_gather.<locals>.reinplace_all_gather.<locals>.repl  s      9LbJbJ!J!B!B!J!J" )+G*1-I		**EEMMZ N  #
 *)r    r   r   r   r   r   r   r   r   r   )replace_by_example)r   r   r   r   rt   s       r   reinplace_all_gatherz7reinplace_fsdp_all_gather.<locals>.reinplace_all_gather  si    0	*$ 	  *+()/0|$vwx |$|$
	
r    )r   r   )4torch.distributed._composable.fsdp._fsdp_collectivesr   is_availableru   r   r   r   ImportErrorAttributeErrorr   pattern_matcherr   r   r   r   r   rx   r   r   rv   rw   apply)
graphr   r   r   r   r   r   
graph_passr   rt   s
            @r   reinplace_fsdp_all_gatherr   Y  ss   
C  --/// II&&==		**EE	
FE
  	  $%JII&&==EE  IINN55==230178|,v&w'x(	 :& |$|$#	
& ?+. 
/. 
D % UE 8 s   A"E( (E?>E?c                    t        | t        j                  j                  j                  t        j                  j                  j
                  f      rJ t        | j                         dd        S )N   )r   rt   	_inductor	schedulerFusedSchedulerNodeGroupedSchedulerNodeintr-   )r)   s    r   
get_op_idxr     s]    OO%%88OO%%::	
   u~~#$$r    c           	     	   ! ddl m  g }t               }d}d}i }i }i ! !fd}	| D ]&  }
t        |
j                  t
        j                  j                  j                  j                        rt        fd|
j                  D              rd}|
}t               }t        |||       t
        j                  j                  j                  j                  t
        j                  j                  j                  j                  t
        j                  j                  j                  j                  t
        j                  j                   j"                  j$                  ht'        ||| fd	       t)        |d
       }t+        |      }d}t-        t+        |            D ]W  }||   }t/        |j                  t
        j                  j                  j                  j                        r|dz  }|dkD  sU|} n |d | }d }t-        t+        |      dz
        D ]3  }t1        ||dz      j                  t2        j4                        s.|dz   } n |J  |	|d |       } |	||d        }|||<   Kt/        |
j                  t
        j                  j                  j6                  j                        sd}|
}t               }t'        |||       t)        |d       }d }t-        t+        |      dz
        D ]3  }t1        ||dz      j                  t2        j4                        s.|dz   } n |J  |	|d |       } |	||d        }|||<   ) t+        !      dkD  sJ |rt+        |      dkD  sJ |rt+        |      dkD  sJ | D ]N  }
|
j9                         !v r!|
j9                            }
|
|v r-|j;                  |
       |j=                  |
       P d }|j?                         D ]j  \  }}|atA        tC        |jE                                     }|jG                         D ],  }|jI                  tK        |j9                         |             . |}l d }|j?                         D ]j  \  }}|atA        tC        |jE                                     }|jG                         D ],  }|jI                  tK        |j9                         |             . |}l |S )Nr	   )r   Fc                    j                   j                  |       }| D ]  }||j                         <    ||j                         <   |S r(   )r   creater-   )snodes_to_group
group_noder)   r   snode_name_to_final_snodes      r   _create_group_nodez:enforce_comm_ordering_for_fsdp.<locals>._create_group_node  sV    33::?K
$ 	EE:D%enn&67	E;E!*"5"5"78r    )opc              3     K   | ]I  }t        |   j                  t        j                  j                  j
                  j                         K y wr(   )r   rs   rt   ru   rv   rw   rx   )rz   rQ   r2   s     r   r{   z1enforce_comm_ordering_for_fsdp.<locals>.<genexpr>  sD      
  "1%**EIINN,M,M,U,U
s   AATc                    t        | j                        xs0 t        | j                        xr | j                  j                  v  S r(   )r   NopKernelSchedulerNodeExternKernelSchedulerNoders   op_overload)rQ   allowed_opsr   s    r   rR   z0enforce_comm_ordering_for_fsdp.<locals>.<lambda>  sF    q)"B"BC "1i&I&IJ >FF..+=	' r    )criteria_cbc                    t        |       S r(   r   rP   s    r   rR   z0enforce_comm_ordering_for_fsdp.<locals>.<lambda>  
    JqM r    rS   r   c                    t        |       S r(   r   rP   s    r   rR   z0enforce_comm_ordering_for_fsdp.<locals>.<lambda>H  r   r    r|   )&r   r   rc   r   rs   rt   ru   r   r   rx   r~   ra   r   wait_tensorrv   split_with_sizes_copyatenset_source_Tensorr   sortedrE   r   r   r   r   _WaitKernelry   r-   rB   rf   re   r*   r+   rC   get_outputsr   r   )"r   r   r2   	new_orderrK   	ag_exists	rs_exists$ag_grouped_node_to_wait_grouped_node$rs_grouped_node_to_wait_grouped_noder   r)   ag_snodeag_related_snode_setag_related_snodesend_idx_of_current_ag_blockcopy_out_countr   	cur_snodewait_node_idxag_group_nodeag_wait_group_noders_snoders_related_snode_setrs_related_snodesrs_group_noders_wait_group_nodeprev_ag_waitwait_group_noder}   oprev_rs_waitr   r   r   s"     `                            @@@r   r   r     s   
 )+IIII+-(+-( "  mUJJ59955PPXX
 
 __	
 
 IHEHU  ($"	 		**EEMM		**66>>		44<<		##11	K )$" !'$*A! +..?*@'N3012 -a0	!NNEIINN$H$H$P$P #a'N!A%23/ !22N3N O !M301A56 /A6;;R^^L$%EM !,,,./@-/PQM "44Emn4U!VBT0? EJJ		(@(@(H(HIIH FIU ($"	 !'$*A!
 !M301A56 /A6;;R^^L$%EM !,,,./@-/PQM "44Emn4U!VBT0?[mU^ ()A---781<<<781<<<  >>88-enn.>?EIe L*N*T*T*V '&#]%C%C%E FGL!--/ **AJJL|D '' L*N*T*T*V '&#]%C%C%E FGL!--/ **AJJL|D '' r    )r   List[BaseSchedulerNode]r;   r  )
r   r  r   boolr   r  r   r  r;   r  )r   r  r;   r  )r)   r   r;   float)r   ztorch.fx.Graphr;   r<   )r   1List[torch._inductor.scheduler.BaseSchedulerNode]r   z4Dict[str, torch._inductor.scheduler.SchedulerBuffer]r2   zDict[str, BaseSchedulerNode]r;   r  ),
__future__r   rF   r   r_   collectionsr   typingr   r   r   r   rt   r   r
   r   dependenciesr   utilsr   r   r   r   r   r   r   _logginggetArtifactLoggerr=   r   r   r   r   r   r#   r   r   rd   r   r   r   r   r   r    r    r   <module>r(     s    #   
 # 1 1   !   nn..xC,#&W#WW W 	W
 Wt"@	U&>#8l^%m=mEm 5m 7	mr    