
    Ǆgj                        d dl Zd dlZd dlmZmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZmZmZmZ d dlmZ d dlmZ d dlmZ d dlmZmZ g d	Z ed
d      Z ed       G d de             Z ed       G d de             Z G d de      Z d Z! G d dee       Z" G d de      Z# ed       G d de             Z$ G d dee       Z% ed       G d  d!e             Z& ed"       G d# d$eee                   Z'y)%    N)ABCabstractmethod)deque)
AnyCallableDequeIteratorListLiteralOptionalSizedTupleTypeVar)functional_datapipe)_SnapshotState)IterDataPipe)_check_unpickable_fnStreamWrapper)ConcaterIterDataPipeDemultiplexerIterDataPipeForkerIterDataPipeMultiplexerIterDataPipeZipperIterDataPipe_T_coT)	covariantconcatc                   F    e Zd ZU dZee   ed<   defdZdefdZ	de
fdZy)r   aN  
    Concatenates multiple Iterable DataPipes (functional name: ``concat``).

    The resulting DataPipe will yield all the elements from the first input DataPipe, before yielding from the subsequent ones.

    Args:
        datapipes: Iterable DataPipes being concatenated

    Example:
        >>> # xdoctest: +REQUIRES(module:torchdata)
        >>> import random
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> dp1 = IterableWrapper(range(3))
        >>> dp2 = IterableWrapper(range(5))
        >>> list(dp1.concat(dp2))
        [0, 1, 2, 0, 1, 2, 3, 4]
    	datapipesc                 ~    t        |      dk(  rt        d      t        d |D              st        d      || _        y )Nr   z/Expected at least one DataPipe, but got nothingc              3   <   K   | ]  }t        |t                y wN
isinstancer   .0dps     q/home/mcse/projects/flask_80/flask-venv/lib/python3.12/site-packages/torch/utils/data/datapipes/iter/combining.py	<genexpr>z0ConcaterIterDataPipe.__init__.<locals>.<genexpr>>        DB:b,/D   z(Expected all inputs to be `IterDataPipe`)len
ValueErrorall	TypeErrorr   selfr   s     r'   __init__zConcaterIterDataPipe.__init__;   s;    y>QNOOD)DDFGG"    returnc              #   F   K   | j                   D ]  }|E d {     y 7 wr!   )r   )r0   r&   s     r'   __iter__zConcaterIterDataPipe.__iter__B   s#     .. 	BMM	s   !!c                     t        d | j                  D              rt        d | j                  D              S t        t	        |       j
                   d      )Nc              3   <   K   | ]  }t        |t                y wr!   r#   r   r$   s     r'   r(   z/ConcaterIterDataPipe.__len__.<locals>.<genexpr>G        >z"e$>r*   c              3   2   K   | ]  }t        |        y wr!   r+   r$   s     r'   r(   z/ConcaterIterDataPipe.__len__.<locals>.<genexpr>H        82s2w8   # instance doesn't have valid length)r-   r   sumr.   type__name__r0   s    r'   __len__zConcaterIterDataPipe.__len__F   G    >t~~>>8888tDz2233VWXXr2   N)rA   
__module____qualname____doc__r   r   __annotations__r1   r	   r5   intrC    r2   r'   r   r   %   s:    $ \""#< #( Y Yr2   r   forkc                   :    e Zd ZdZ	 	 d	dedededeed      fdZy)
r   a  
    Creates multiple instances of the same Iterable DataPipe (functional name: ``fork``).

    Args:
        datapipe: Iterable DataPipe being copied
        num_instances: number of instances of the datapipe to create
        buffer_size: this restricts how far ahead the leading child DataPipe
           can read relative to the slowest child DataPipe.
           Defaults to ``1000``. Use ``-1`` for the unlimited buffer.
        copy: copy strategy to use for items yielded by each branch. Supported
            options are ``None`` for no copying, ``"shallow"`` for shallow object
            copies, and ``"deep"`` for deep object copies. Defaults to ``None``.

    Note:
        All branches of the forked pipeline return the identical object unless
        the copy parameter is supplied. If the object is mutable or contains
        mutable objects, changing them in one branch will affect all others.

    Example:
        >>> # xdoctest: +REQUIRES(module:torchdata)
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> source_dp = IterableWrapper(range(5))
        >>> dp1, dp2 = source_dp.fork(num_instances=2)
        >>> list(dp1)
        [0, 1, 2, 3, 4]
        >>> list(dp2)
        [0, 1, 2, 3, 4]
    Ndatapipenum_instancesbuffer_sizecopyshallowdeepc                     |dk  rt        d| d      |dk(  r|S t        ||||      }t        |      D cg c]  }t        ||       c}S c c}w N   z,Expected `num_instances` larger than 0, but z	 is found)r,   _ForkerIterDataPiperange_ChildDataPipe)clsrM   rN   rO   rP   	containeris          r'   __new__zForkerIterDataPipe.__new__l   sd     1>}oYW  AO'-dS	6;M6JKy!,KKKs   A  N)	rA   rE   rF   rG   r   rI   r   r   r]   rJ   r2   r'   r   r   M   sK    B  59LL L 	L
 w012Lr2   r   c                   d    e Zd ZdZedefd       Zedefd       Zed	d       Z	edefd       Z
y)
_ContainerTemplatezVAbstract class for container ``DataPipes``. The followings are three required methods.instance_idc                      y r!   rJ   r0   rb   s     r'   get_next_element_by_instancez/_ContainerTemplate.get_next_element_by_instance       r2   r3   c                      y r!   rJ   rB   s    r'   is_every_instance_exhaustedz._ContainerTemplate.is_every_instance_exhausted   rf   r2   Nc                      y r!   rJ   rB   s    r'   resetz_ContainerTemplate.reset   rf   r2   c                      y)zSRaise TypeError if it's not supposed to be implemented to support `list(datapipe)`.NrJ   rd   s     r'   get_length_by_instancez)_ContainerTemplate.get_length_by_instance   s    r2   r3   N)rA   rE   rF   rG   r   rI   re   boolrh   rj   rl   rJ   r2   r'   ra   ra   }   sk    a   T     c# c cr2   ra   c                     | S r!   rJ   )xs    r'   _no_oprq      s    Hr2   c                       e Zd ZdZ	 	 ddedededeed      fdZd	 Z	d
efdZ
defdZd
edefdZddZd Zd Zd Zd Zy)rW   z
    Container to hold instance-specific information on behalf of ForkerIterDataPipe.

    It tracks the state of its child DataPipes, maintains the buffer, and yields the next value
    as requested by the child DataPipes.
    NrM   rN   rO   rP   rQ   c                    || _         d | _        || _        t               | _        || _        | j
                  dk  rt        j                  dt               |t        | _
        nE|dk(  rt        j                  | _
        n*|dk(  rt        j                  | _
        nt        d| d      dg|z  | _        d| _        d| _        d | _        t'        |      D cg c]  }d c}| _        y c c}w )Nr   zPUnlimited buffer size is set for `fork`, please be aware of OOM at random placesrR   rS   zUnknown copy method `z5` requested, choose one of None, `shallow` or `deep`.T)main_datapipe_datapipe_iteratorrN   r   bufferrO   warningswarnUserWarningrq   copy_fn
copymodulerP   deepcopyr,   child_pointersslowest_ptrleading_ptrend_ptrrX   _child_stop)r0   rM   rN   rO   rP   _s         r'   r1   z_ForkerIterDataPipe.__init__   s     &;?*"W&aMM:
 <!DLY%??DLV^%..DL'v-bc 
 *
* &*6;M6J'K'K'Ks   	C'c                 ,    t        | j                        S r!   r+   rt   rB   s    r'   rC   z_ForkerIterDataPipe.__len__       4%%&&r2   rb   c              #     K   | j                   g| j                  |   rXt        | j                        | _         t        j
                  | _        t        | j                        D ]  }d| j                  |<    	 | j                  |   s| j                  |xx   dz  cc<   | j                  -| j                  |   | j                  k(  rd| j                  |<   nu| j                  rK| j                  |   | j                  k  r/| j                  |   | j                  z
  dz
  }| j                  |   }nE| j                  |   | _        	 t        | j                         }| j                  j                  |       | j                  |   | j                  dz   k(  rEt#        | j                        }| j                  |k  r!|| _        | j                  j%                          | j&                  dk\  rB| j                  | j&                  | j                  z   kD  rt)        dd| j&                   dz         | j+                  |       | j                  |   sd| j                  |<   t-        | j                        rd | _         | j/                          y y # t         $ r+ d| j                  |<   d | _         | j                  | _
        Y Ew xY w# d| j                  |<   t-        | j                        rd | _         | j/                          w w xY ww)NFrV   Tr   z#ForkerIterDataPipe buffer overflow,zbuffer size  is insufficient.)ru   r   iterrt   r   	Iterating_snapshot_staterX   rN   r}   r   rv   r   r~   nextappendStopIterationminpopleftrO   BufferErrorrz   r-   _cleanup)r0   rb   r\   idx
return_valnew_mins         r'   re   z0_ForkerIterDataPipe.get_next_element_by_instance   s    ""*t/?/?/L&*4+=+=&>D##1#;#;D 4--. ,&+  #,-	 &&{3##K0A50LL,++K8DLLH48D$$[1;;4#6#6{#CtGWGW#W--k:T=M=MMPQQC!%S!1J'+':':;'GD$!%)$*A*A%B
**:6 &&{3t7G7G!7KK!++G '''1+2(++-$$)((4+;+;d>N>N+NN%=()9)9(::KLM 
 ll:..M &&{3P -1D[)4##$*.' %3 ) !8<((526/'+'7'7 	!. -1D[)4##$*.' %sD   A4L7CK  0J	 3CK  =L	0J=9K  <J==K   >K>>Lr3   c                 L    | j                   d uxr t        | j                        S r!   )r   r-   r   rB   s    r'   rh   z/_ForkerIterDataPipe.is_every_instance_exhausted   s!    ||4'AC0@0@,AAr2   c                 ,    t        | j                        S r!   r   rd   s     r'   rl   z*_ForkerIterDataPipe.get_length_by_instance   r   r2   c                     d | _         t               | _        dg| j                  z  | _        d| _        d| _        d | _        t        | j                        D cg c]  }d c}| _	        y c c}w Nr   T)
ru   r   rv   rN   r}   r~   r   r   rX   r   r0   r   s     r'   rj   z_ForkerIterDataPipe.reset   s`    "&g cD$6$66*/0B0B*CDQDDDs   	A*c                     | j                   | j                  | j                  | j                  | j                  | j
                  f}t        j                  t        j                  |      S |S r!   )rt   rN   rO   rz   _valid_iterator_id_number_of_samples_yieldedr   getstate_hookr0   states     r'   __getstate__z _ForkerIterDataPipe.__getstate__	  sa    LL##++
 %%1--e44r2   c                 ,   |\  | _         | _        | _        | _        | _        | _        d | _        t               | _        dg| j                  z  | _	        d| _
        d| _        d | _        t        | j                        D cg c]  }d c}| _        y c c}w r   )rt   rN   rO   rz   r   r   ru   r   rv   r}   r~   r   r   rX   r   r0   r   r   s      r'   __setstate__z _ForkerIterDataPipe.__setstate__  s     	
L#+"&g cD$6$66*/0B0B*CDQDDDs   ?	Bc                     | j                   r=| j                   j                         }t        j                  |       | j                   r<y y r!   )rv   r   r   close_streams)r0   ds     r'   r   z_ForkerIterDataPipe._cleanup'  s2    kk##%A''* kkr2   c                 $    | j                          y r!   r   rB   s    r'   __del__z_ForkerIterDataPipe.__del__,      r2   r^   rm   )rA   rE   rF   rG   r   rI   r   r   r1   rC   re   rn   rh   rl   rj   r   r   r   r   rJ   r2   r'   rW   rW      s      59#L#L #L 	#L
 w012#LJ'3  3 jBT B'# '# 'EE"+
r2   rW   c                   T    e Zd ZU dZdZeed<   dedefdZ	d Z
d Zd	efd
Zd	efdZy)rY   a  
    Iterable Datapipe that is a child of a main DataPipe.

    The instance of this class will pass its instance_id to get the next value from its main DataPipe.

    Note:
        ChildDataPipe, like all other IterDataPipe, follows the single iterator per IterDataPipe constraint.
        Since ChildDataPipes share a common buffer, when an iterator is created for one of the ChildDataPipes,
        the previous iterators  for all ChildDataPipes must be invalidated, with the exception when a ChildDataPipe
        hasn't had an iterator created from it since the last invalidation. See the example below.

    Example:
        >>> # xdoctest: +REQUIRES(module:torchdata)
        >>> # Singler Iterator per IteraDataPipe Invalidation
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> source_dp = IterableWrapper(range(10))
        >>> cdp1, cdp2 = source_dp.fork(num_instances=2)
        >>> it1, it2 = iter(cdp1), iter(cdp2)
        >>> it3 = iter(cdp1)
        >>> # The line above invalidates `it1` and `it2`, and resets `ForkerIterDataPipe`.
        >>> it4 = iter(cdp2)
        >>> # The line above doesn't invalidate `it3`, because an iterator for `cdp2` hasn't been created since
        >>> # the last invalidation.

    Args:
        main_datapipe: Main DataPipe with a method 'get_next_element_by_instance(instance_id)'
        instance_id: integer identifier of this instance
    T_is_child_datapipert   rb   c                 D    t        |t              sJ || _        || _        y r!   )r#   ra   rt   rb   )r0   rt   rb   s      r'   r1   z_ChildDataPipe.__init__P  s#    -);<<<+8&r2   c                 L    | j                   j                  | j                        S r!   )rt   re   rb   rB   s    r'   r5   z_ChildDataPipe.__iter__V  s!     !!>>t?O?OPPr2   c                 L    | j                   j                  | j                        S r!   )rt   rl   rb   rB   s    r'   rC   z_ChildDataPipe.__len__[  s    !!889I9IJJr2   r3   c                    | j                   j                  d| j                   _        n| j                   j                  | j                  k(  rm| j                   xj                  dz  c_        | j                   j                         st        j                  dt
               | j                   j                          | j                   j                  | _        | j                  S )z
        Update the valid iterator ID for both this DataPipe object and `main_datapipe`.

        `main_datapipe.reset()` is called when the ID is incremented to a new generation.
        r   rV   zSome child DataPipes are not exhausted when __iter__ is called. We are resetting the buffer and each child DataPipe will read from the start again.)rt   r   rh   rw   rx   ry   rj   rB   s    r'   $_set_main_datapipe_valid_iterator_idz3_ChildDataPipe._set_main_datapipe_valid_iterator_id_  s     00845D1 22d6M6MM11Q61%%AACY
 $$& #'"4"4"G"G&&&r2   c                 V    || j                   k(  xr || j                  j                   k(  S )zXCheck the valid iterator ID against that of DataPipe object and that of `main_datapipe`.)r   rt   )r0   iterator_ids     r'   _check_valid_iterator_idz'_ChildDataPipe._check_valid_iterator_idz  s1     4222 Et11DDD	
r2   N)rA   rE   rF   rG   r   rn   rH   r   rI   r1   r5   rC   r   r   rJ   r2   r'   rY   rY   0  sK    :  $#'l ' 'Q
K'c '6
t 
r2   rY   demuxc                   D    e Zd ZdZ	 	 d	dededeegee   f   de	def
dZ
y)
r   a+  
    Splits the input DataPipe into multiple child DataPipes, using the given classification function (functional name: ``demux``).

    A list of the child DataPipes is returned from this operation.

    Args:
        datapipe: Iterable DataPipe being filtered
        num_instances: number of instances of the DataPipe to create
        classifier_fn: a function that maps values to an integer within the range ``[0, num_instances - 1]`` or ``None``
        drop_none: defaults to ``False``, if ``True``, the function will skip over elements classified as ``None``
        buffer_size: this defines the maximum number of inputs that the buffer can hold across all child
            DataPipes while waiting for their values to be yielded.
            Defaults to ``1000``. Use ``-1`` for the unlimited buffer.

    Examples:
        >>> # xdoctest: +REQUIRES(module:torchdata)
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> def odd_or_even(n):
        ...     return n % 2
        >>> source_dp = IterableWrapper(range(5))
        >>> dp1, dp2 = source_dp.demux(num_instances=2, classifier_fn=odd_or_even)
        >>> list(dp1)
        [0, 2, 4]
        >>> list(dp2)
        [1, 3]
        >>> # It can also filter out any element that gets `None` from the `classifier_fn`
        >>> def odd_or_even_no_zero(n):
        ...     return n % 2 if n != 0 else None
        >>> dp1, dp2 = source_dp.demux(num_instances=2, classifier_fn=odd_or_even_no_zero, drop_none=True)
        >>> list(dp1)
        [2, 4]
        >>> list(dp2)
        [1, 3]
    rM   rN   classifier_fn	drop_nonerO   c                     |dk  rt        d| d      t        |       t        |||||      }t        |      D cg c]  }t	        ||       c}S c c}w rU   )r,   r   _DemultiplexerIterDataPiperX   rY   )rZ   rM   rN   r   r   rO   r[   r\   s           r'   r]   z!DemultiplexerIterDataPipe.__new__  se     1>}oYW  	]+
 /xW`bmn	6;M6JKy!,KKKs   AN)Fr_   )rA   rE   rF   rG   r   rI   r   r   r   rn   r]   rJ   r2   r'   r   r     s[    !P  LL L  # 67	L
 L Lr2   r   c            
           e Zd ZdZdee   dedeegee   f   de	def
dZ
ded	efd
ZdefdZd	e	fdZded	efdZddZd Zd Zddee   fdZd Zy)r   z
    Container to hold instance-specific information on behalf of DemultiplexerIterDataPipe.

    It tracks the state of its child DataPipes, maintains the buffer, classifies and yields the next correct value
    as requested by the child DataPipes.
    rM   rN   r   r   rO   c                 b   || _         d | _        || _        || _        | j                  dk  rt	        j
                  dt               d| _        t        |      D cg c]  }t                c}| _
        || _        || _        d| _        t        |      D cg c]  }d c}| _        y c c}w c c}w )Nr   zQUnlimited buffer size is set for `demux`, please be aware of OOM at random placesFT)rt   ru   rN   rO   rw   rx   ry   current_buffer_usagerX   r   child_buffersr   r   main_datapipe_exhaustedr   )r0   rM   rN   r   r   rO   r   s          r'   r1   z#_DemultiplexerIterDataPipe.__init__  s     &;?*&aMM:
 %&!CHCW1Xa%'1X*"',$6;M6J'K'K	 2Y (Ls   B'	B,rb   r3   c                 r   	 | j                   s| j                  |   rt        | j                  t	        d      t        | j                        }| j                  |      }|"| j                  rt        j                  |       ||| j                  k\  s|dk  r"t	        d| j                  dz
   d| dz         ||k(  r|S | j                  |   j                  |       | xj                  dz  c_        | j                  dk\  r2| j                  | j                  kD  rt        d| j                   d      7)	Nz_datapipe_iterator has not been set, likely because this private method is called directly without invoking get_next_element_by_instance() first.r   z8Output of the classification fn should be between 0 and rV   z. z is returned.z7DemultiplexerIterDataPipe buffer overflow, buffer size r   )r   r   r   ru   r,   r   r   r   r   r   rN   r   r   r   rO   r   )r0   rb   valueclassifications       r'   
_find_nextz%_DemultiplexerIterDataPipe._find_next  sR   ++t/?/?/L##&&. M  001E!//6N%$..++E2&!T%7%77!A% NtOaOadeOeNffhi'(67  ,~.55e<%%*%1$)B)BTEUEU)U!MdN^N^M__pq 7 r2   c              #     K   | j                   n| j                  |   r_t        | j                        | _         t        j
                  | _        d| _        t        | j                        D ]  }d| j                  |<    	 | j                  |   sh| j                  |   r5| xj                  dz  c_        | j                  |   j                          n	 | j                  |       | j                  |   shd| j                  |<   t        | j                        rd | _         | j                  |   r| j!                  |       y y # t        $ r  d| j                  |<   d| _        d | _         Y w xY w# d| j                  |<   t        | j                        rd | _         | j                  |   r| j!                  |       w w xY ww)NFrV   T)ru   r   r   rt   r   r   r   r   rX   rN   r   r   r   r   r   r-   r   )r0   rb   r\   s      r'   re   z7_DemultiplexerIterDataPipe.get_next_element_by_instance  s    ""*t/?/?/L&*4+=+=&>D#((   ,1D(4--. ,&+  #,	+&&{3%%k2--2-,,[9AACC7"ook:: &&{3 -1D[)4##$*.'!!+.k* / ) 78<((57;426/7
 -1D[)4##$*.'!!+.k* /sE   A;F?>AE. E %E. 5AF?&E+(E. *E++E. .AF<<F?c                 H    | j                   xr t        | j                        S r!   )r   r-   r   rB   s    r'   rh   z6_DemultiplexerIterDataPipe.is_every_instance_exhausted  s    ++ED4D4D0EEr2   c                     t         r!   )r.   rd   s     r'   rl   z1_DemultiplexerIterDataPipe.get_length_by_instance  s    r2   Nc                     d | _         d| _        t        | j                        D cg c]  }t	                c}| _        t        | j                        D cg c]  }d c}| _        d| _        y c c}w c c}w Nr   TF)ru   r   rX   rN   r   r   r   r   r   s     r'   rj   z _DemultiplexerIterDataPipe.reset"  sa    "&$%!/4T5G5G/HI!egI*/0B0B*CDQDD',$ JDs   A/	A4c                     | j                   | j                  | j                  | j                  | j                  | j
                  | j                  f}t        j                  t        j                  |      S |S r!   )	rt   rN   rO   r   r   r   r   r   r   r   s     r'   r   z'_DemultiplexerIterDataPipe.__getstate__)  sj    NN##++
 %%1--e44r2   c                 L   |\  | _         | _        | _        | _        | _        | _        | _        d | _        d| _        t        | j                        D cg c]  }t                c}| _        t        | j                        D cg c]  }d c}| _        d| _        y c c}w c c}w r   )rt   rN   rO   r   r   r   r   ru   r   rX   r   r   r   r   r   s      r'   r   z'_DemultiplexerIterDataPipe.__setstate__7  s     	
N#+"&$%!/4T5G5G/HI!egI*/0B0B*CDQDD',$ JDs   B	B!c                     |t        | j                        n|g}|D ]<  }| j                  |   }|s|j                         }t	        j
                  |       |r(> y r!   )rX   rN   r   r   r   r   )r0   rb   idsr\   qr   s         r'   r   z#_DemultiplexerIterDataPipe._cleanupG  sk     " $$$%  	  	/A""1%AIIK++A. 	/r2   c                 $    | j                          y r!   r   rB   s    r'   r   z"_DemultiplexerIterDataPipe.__del__U  r   r2   rm   r!   )rA   rE   rF   rG   r   r   rI   r   r   rn   r1   r   re   rh   rl   rj   r   r   r   r   rJ   r2   r'   r   r     s    Lu%L L  # 67	L
 L L2c e @+ +<FT F# # -- /HSM /r2   r   muxc                   <    e Zd ZdZd Zd Zd Zd
dZd Zd Z	d	 Z
y)r   a  
    Yields one element at a time from each of the input Iterable DataPipes (functional name: ``mux``).

    As in, one element from the 1st input DataPipe, then one element from the 2nd DataPipe in the next iteration,
    and so on. It ends when the shortest input DataPipe is exhausted.

    Args:
        datapipes: Iterable DataPipes that will take turn to yield their elements, until the shortest DataPipe is exhausted

    Example:
        >>> # xdoctest: +REQUIRES(module:torchdata)
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> dp1, dp2, dp3 = IterableWrapper(range(3)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25))
        >>> list(dp1.mux(dp2, dp3))
        [0, 10, 20, 1, 11, 21, 2, 12, 22]
    c                      || _         g | _        y r!   )r   rv   r/   s     r'   r1   z MultiplexerIterDataPipe.__init__l  s    " 	r2   c              #     K   | j                   D cg c]  }t        |       }}t        |      ri|D ])  }	 t        |      }| j                  j                  |       + | j                  E d {    | j                  j                          t        |      rhy y c c}w # t        $ r | j                  j                          Y  y w xY w7 [wr!   )r   r   r+   r   rv   r   r   clear)r0   rp   	iteratorsitr   s        r'   r5   z MultiplexerIterDataPipe.__iter__r  s     &*nn5T!W5	5)n  HEKK&&u- {{""KK )n 6 % KK%%' #sD   CBC&BC-C	.)CC#CCCCc                     t        d | j                  D              r2t        d | j                  D              t        | j                        z  S t	        t        |       j                   d      )Nc              3   <   K   | ]  }t        |t                y wr!   r8   r$   s     r'   r(   z2MultiplexerIterDataPipe.__len__.<locals>.<genexpr>  r9   r*   c              3   2   K   | ]  }t        |        y wr!   r;   r$   s     r'   r(   z2MultiplexerIterDataPipe.__len__.<locals>.<genexpr>  r<   r=   r>   )r-   r   r   r+   r.   r@   rA   rB   s    r'   rC   zMultiplexerIterDataPipe.__len__  sU    >t~~>>8883t~~;NNNtDz2233VWXXr2   Nc                     g | _         y r!   )rv   rB   s    r'   rj   zMultiplexerIterDataPipe.reset  s	    r2   c                     | j                   | j                  | j                  f}t        j                  t        j                  |      S |S r!   )r   r   r   r   r   r   s     r'   r   z$MultiplexerIterDataPipe.__getstate__  sF    NN##++

 %%1--e44r2   c                 <    |\  | _         | _        | _        g | _        y r!   )r   r   r   rv   r   s     r'   r   z$MultiplexerIterDataPipe.__setstate__  s$    
 		
N#+r2   c                 8    | j                   j                          y r!   )rv   r   rB   s    r'   r   zMultiplexerIterDataPipe.__del__  s    r2   rm   )rA   rE   rF   rG   r1   r5   rC   rj   r   r   r   rJ   r2   r'   r   r   Y  s+    "
 Yr2   r   zipc                   ^     e Zd ZU dZee   ed<   def fdZdeee	      fdZ
defdZ xZS )r   aa  
    Aggregates elements into a tuple from each of the input DataPipes (functional name: ``zip``).

    The output is stopped as soon as the shortest input DataPipe is exhausted.

    Args:
        *datapipes: Iterable DataPipes being aggregated

    Example:
        >>> # xdoctest: +REQUIRES(module:torchdata)
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> dp1, dp2, dp3 = IterableWrapper(range(5)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25))
        >>> list(dp1.zip(dp2, dp3))
        [(0, 10, 20), (1, 11, 21), (2, 12, 22), (3, 13, 23), (4, 14, 24)]
    r   c                 j    t        d |D              st        d      t        |           || _        y )Nc              3   <   K   | ]  }t        |t                y wr!   r"   r$   s     r'   r(   z.ZipperIterDataPipe.__init__.<locals>.<genexpr>  r)   r*   zCAll inputs are required to be `IterDataPipe` for `ZipIterDataPipe`.)r-   r.   superr1   r   )r0   r   	__class__s     r'   r1   zZipperIterDataPipe.__init__  s5    D)DDX  	"r2   r3   c              #   ~   K   | j                   D cg c]  }t        |       }}t        | E d {    y c c}w 7 
wr!   )r   r   r   )r0   rM   r   s      r'   r5   zZipperIterDataPipe.__iter__  s7     48NNCT(^C	C	?"" D"s   =6=;=c                     t        d | j                  D              rt        d | j                  D              S t        t	        |       j
                   d      )Nc              3   <   K   | ]  }t        |t                y wr!   r8   r$   s     r'   r(   z-ZipperIterDataPipe.__len__.<locals>.<genexpr>  r9   r*   c              3   2   K   | ]  }t        |        y wr!   r;   r$   s     r'   r(   z-ZipperIterDataPipe.__len__.<locals>.<genexpr>  r<   r=   r>   )r-   r   r   r.   r@   rA   rB   s    r'   rC   zZipperIterDataPipe.__len__  rD   r2   )rA   rE   rF   rG   r   r   rH   r1   r	   r   r5   rI   rC   __classcell__)r   s   @r'   r   r     sD      \""#< ##(5<0 #Y Yr2   r   )(rP   r{   rw   abcr   r   collectionsr   typingr   r   r   r	   r
   r   r   r   r   r   %torch.utils.data.datapipes._decoratorr   )torch.utils.data.datapipes._hook_iteratorr   #torch.utils.data.datapipes.datapiper   'torch.utils.data.datapipes.utils.commonr   r   __all__r   r   r   ra   rq   rW   rY   r   r   r   r   rJ   r2   r'   <module>r      sF     #    F D < W 	4( X$Y< $Y $YN V,L ,L ,L^c c(X,(: XvO
\ O
d W7L 7L 7LtY/A Yx UAl A AH U#YeEl3 #Y #Yr2   