U
    gZ                     @  s|  U d dl mZ d dlZd dlmZmZmZmZ ddlm	Z	m
Z
 ddlmZ ddlmZmZ erjd dlmZ eg ee f Zd	ed
< eg ef Zd	ed< ededZededZG dd dZe
jG dd deZe
jG dd deZdddddddddZddddZdd d!d"d#Zd$dd%d&ZG d'd( d(Z G d)d* d*eZ!G d+d, d,eZ"d-dd.d/Z#d0dd1d2Z$dS )3    )annotationsN)TYPE_CHECKING	AwaitableCallableTypeVar   )_core_utilStapledStream)ReceiveStream
SendStream)	TypeAliasr   	AsyncHookSyncHookSendStreamT)boundReceiveStreamTc                   @  s   e Zd ZddddZddddZddddZd	dd
ddZdddddZdddddZddddddZ	ddddddZ
dS )_UnboundedByteQueueNonereturnc                 C  s(   t  | _d| _t | _td| _d S )NFz%another task is already fetching data)		bytearray_data_closedr   
ParkingLot_lotr	   ConflictDetector_fetch_lockself r!   @/tmp/pip-unpacked-wheel-ks04xdmi/trio/testing/_memory_streams.py__init__   s    
z_UnboundedByteQueue.__init__c                 C  s   d| _ | j  d S NT)r   r   
unpark_allr   r!   r!   r"   close'   s    z_UnboundedByteQueue.closec                 C  s   t  | _|   d S N)r   r   r&   r   r!   r!   r"   close_and_wipe+   s    z"_UnboundedByteQueue.close_and_wipebytes | bytearray | memoryviewdatar   c                 C  s,   | j rtd|  j|7  _| j  d S )Nzvirtual connection closed)r   r   ClosedResourceErrorr   r   r%   r    r+   r!   r!   r"   put/   s    
z_UnboundedByteQueue.put
int | None	max_bytesr   c                 C  s*   |d krd S t |}|dk r&tdd S )N   max_bytes must be >= 1)operatorindex
ValueErrorr    r1   r!   r!   r"   _check_max_bytes5   s
    
z$_UnboundedByteQueue._check_max_bytesr   c                 C  sX   | j s| jst|d kr"t| j}| jrN| jd | }| jd |= |sJt|S t S d S r'   )r   r   AssertionErrorlenr   )r    r1   chunkr!   r!   r"   	_get_impl<   s    
z_UnboundedByteQueue._get_implNc              
   C  sD   | j 4 | | | js$| js$tj| |W  5 Q R  S Q R X d S r'   )r   r8   r   r   r   
WouldBlockr<   r7   r!   r!   r"   
get_nowaitH   s
    
z_UnboundedByteQueue.get_nowaitc              
     s^   | j N | | | js0| js0| j I d H  nt I d H  | |W  5 Q R  S Q R X d S r'   )	r   r8   r   r   r   parkr   
checkpointr<   r7   r!   r!   r"   getO   s    
z_UnboundedByteQueue.get)N)N)__name__
__module____qualname__r#   r&   r(   r.   r8   r<   r>   rA   r!   r!   r!   r"   r      s   r   c                   @  s   e Zd ZdZdddddddZdd	d
ddZd	dddZd	dddZd	dddZddddddZ	ddddddZ
dS )MemorySendStreama  An in-memory :class:`~trio.abc.SendStream`.

    Args:
      send_all_hook: An async function, or None. Called from
          :meth:`send_all`. Can do whatever you like.
      wait_send_all_might_not_block_hook: An async function, or None. Called
          from :meth:`wait_send_all_might_not_block`. Can do whatever you
          like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.

    .. attribute:: send_all_hook
                   wait_send_all_might_not_block_hook
                   close_hook

       All of these hooks are also exposed as attributes on the object, and
       you can change them at any time.

    NAsyncHook | NoneSyncHook | None)send_all_hook"wait_send_all_might_not_block_hook
close_hookc                 C  s*   t d| _t | _|| _|| _|| _d S )N!another task is using this stream)r	   r   _conflict_detectorr   	_outgoingrH   rI   rJ   )r    rH   rI   rJ   r!   r!   r"   r#   o   s    zMemorySendStream.__init__r)   r   r*   c              	     sV   | j F t I dH  t I dH  | j| | jdk	rH|  I dH  W 5 Q R X dS )z}Places the given data into the object's internal buffer, and then
        calls the :attr:`send_all_hook` (if any).

        N)rL   r   r@   rM   r.   rH   r-   r!   r!   r"   send_all}   s    
zMemorySendStream.send_allr   c              	     sV   | j F t I dH  t I dH  | jd | jdk	rH|  I dH  W 5 Q R X dS )znCalls the :attr:`wait_send_all_might_not_block_hook` (if any), and
        then returns immediately.

        N    )rL   r   r@   rM   r.   rI   r   r!   r!   r"   wait_send_all_might_not_block   s    
z.MemorySendStream.wait_send_all_might_not_blockc                 C  s    | j   | jdk	r|   dS )z^Marks this stream as closed, and then calls the :attr:`close_hook`
        (if any).

        N)rM   r&   rJ   r   r!   r!   r"   r&      s    

zMemorySendStream.closec                   s   |    t I dH  dS z!Same as :meth:`close`, but async.Nr&   r   r@   r   r!   r!   r"   aclose   s    zMemorySendStream.acloser/   r   r0   c                   s   | j |I dH S )a  Retrieves data from the internal buffer, blocking if necessary.

        Args:
          max_bytes (int or None): The maximum amount of data to
              retrieve. None (the default) means to retrieve all the data
              that's present (but still blocks until at least one byte is
              available).

        Returns:
          If this stream has been closed, an empty bytearray. Otherwise, the
          requested data.

        N)rM   rA   r7   r!   r!   r"   get_data   s    zMemorySendStream.get_datac                 C  s   | j |S )zRetrieves data from the internal buffer, but doesn't block.

        See :meth:`get_data` for details.

        Raises:
          trio.WouldBlock: if no data is available to retrieve.

        )rM   r>   r7   r!   r!   r"   get_data_nowait   s    	z MemorySendStream.get_data_nowait)NNN)N)N)rB   rC   rD   __doc__r#   rN   rP   r&   rS   rT   rU   r!   r!   r!   r"   rE   Y   s      rE   c                   @  sn   e Zd ZdZddddddZddd	d
ddZddddZddddZdddddZddddZ	dS )MemoryReceiveStreama  An in-memory :class:`~trio.abc.ReceiveStream`.

    Args:
      receive_some_hook: An async function, or None. Called from
          :meth:`receive_some`. Can do whatever you like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.

    .. attribute:: receive_some_hook
                   close_hook

       Both hooks are also exposed as attributes on the object, and you can
       change them at any time.

    NrF   rG   )receive_some_hookrJ   c                 C  s*   t d| _t | _d| _|| _|| _d S )NrK   F)r	   r   rL   r   	_incomingr   rX   rJ   )r    rX   rJ   r!   r!   r"   r#      s    zMemoryReceiveStream.__init__r/   r   r0   c              
     s   | j p t I dH  t I dH  | jr0tj| jdk	rH|  I dH  | j|I dH }| jrftj|W  5 Q R  S Q R X dS )zCalls the :attr:`receive_some_hook` (if any), and then retrieves
        data from the internal buffer, blocking if necessary.

        N)rL   r   r@   r   r,   rX   rY   rA   )r    r1   r+   r!   r!   r"   receive_some   s    
z MemoryReceiveStream.receive_somer   r   c                 C  s&   d| _ | j  | jdk	r"|   dS )zfDiscards any pending data from the internal buffer, and marks this
        stream as closed.

        TN)r   rY   r(   rJ   r   r!   r!   r"   r&     s    

zMemoryReceiveStream.closec                   s   |    t I dH  dS rQ   rR   r   r!   r!   r"   rS     s    zMemoryReceiveStream.acloser)   r*   c                 C  s   | j | dS )z.Appends the given data to the internal buffer.N)rY   r.   r-   r!   r!   r"   put_data  s    zMemoryReceiveStream.put_datac                 C  s   | j   dS )z2Adds an end-of-file marker to the internal buffer.N)rY   r&   r   r!   r!   r"   put_eof  s    zMemoryReceiveStream.put_eof)NN)N)
rB   rC   rD   rV   r#   rZ   r&   rS   r[   r\   r!   r!   r!   r"   rW      s     
rW   )r1   r/   bool)memory_send_streammemory_receive_streamr1   r   c                C  sl   z|  |}W n tjk
r&   Y dS X z|s8|  n
|| W n" tjk
rf   tddY nX dS )a  Take data out of the given :class:`MemorySendStream`'s internal buffer,
    and put it into the given :class:`MemoryReceiveStream`'s internal buffer.

    Args:
      memory_send_stream (MemorySendStream): The stream to get data from.
      memory_receive_stream (MemoryReceiveStream): The stream to put data into.
      max_bytes (int or None): The maximum amount of data to transfer in this
          call, or None to transfer all available data.

    Returns:
      True if it successfully transferred some data, or False if there was no
      data to transfer.

    This is used to implement :func:`memory_stream_one_way_pair` and
    :func:`memory_stream_pair`; see the latter's docstring for an example
    of how you might use it yourself.

    FzMemoryReceiveStream was closedNT)rU   r   r=   r\   r[   r,   BrokenResourceError)r^   r_   r1   r+   r!   r!   r"   memory_stream_pump  s    
ra   z,tuple[MemorySendStream, MemoryReceiveStream]r   c                    sF   t  t ddfdd dd fdd} | _ _fS )uQ  Create a connected, pure-Python, unidirectional stream with infinite
    buffering and flexible configuration options.

    You can think of this as being a no-operating-system-involved
    Trio-streamsified version of :func:`os.pipe` (except that :func:`os.pipe`
    returns the streams in the wrong order – we follow the superior convention
    that data flows from left to right).

    Returns:
      A tuple (:class:`MemorySendStream`, :class:`MemoryReceiveStream`), where
      the :class:`MemorySendStream` has its hooks set up so that it calls
      :func:`memory_stream_pump` from its
      :attr:`~MemorySendStream.send_all_hook` and
      :attr:`~MemorySendStream.close_hook`.

    The end result is that data automatically flows from the
    :class:`MemorySendStream` to the :class:`MemoryReceiveStream`. But you're
    also free to rearrange things however you like. For example, you can
    temporarily set the :attr:`~MemorySendStream.send_all_hook` to None if you
    want to simulate a stall in data transmission. Or see
    :func:`memory_stream_pair` for a more elaborate example.

    r   r   c                     s   t   d S r'   )ra   r!   )recv_streamsend_streamr!   r"   $pump_from_send_stream_to_recv_stream[  s    zHmemory_stream_one_way_pair.<locals>.pump_from_send_stream_to_recv_streamc                     s
      d S r'   r!   r!   )rd   r!   r"   *async_pump_from_send_stream_to_recv_stream^  s    zNmemory_stream_one_way_pair.<locals>.async_pump_from_send_stream_to_recv_stream)rE   rW   rH   rJ   )re   r!   )rd   rb   rc   r"   memory_stream_one_way_pair@  s    rf   z0Callable[[], tuple[SendStreamT, ReceiveStreamT]]z]tuple[StapledStream[SendStreamT, ReceiveStreamT], StapledStream[SendStreamT, ReceiveStreamT]])one_way_pairr   c                 C  s0   |  \}}|  \}}t ||}t ||}||fS r'   r
   )rg   Z
pipe1_sendZ
pipe1_recvZ
pipe2_sendZ
pipe2_recvZstream1Zstream2r!   r!   r"   _make_stapled_pairf  s
    



rh   zqtuple[StapledStream[MemorySendStream, MemoryReceiveStream], StapledStream[MemorySendStream, MemoryReceiveStream]]c                   C  s   t tS )a  Create a connected, pure-Python, bidirectional stream with infinite
    buffering and flexible configuration options.

    This is a convenience function that creates two one-way streams using
    :func:`memory_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.

    This is like a no-operating-system-involved, Trio-streamsified version of
    :func:`socket.socketpair`.

    Returns:
      A pair of :class:`~trio.StapledStream` objects that are connected so
      that data automatically flows from one to the other in both directions.

    After creating a stream pair, you can send data back and forth, which is
    enough for simple tests::

       left, right = memory_stream_pair()
       await left.send_all(b"123")
       assert await right.receive_some() == b"123"
       await right.send_all(b"456")
       assert await left.receive_some() == b"456"

    But if you read the docs for :class:`~trio.StapledStream` and
    :func:`memory_stream_one_way_pair`, you'll see that all the pieces
    involved in wiring this up are public APIs, so you can adjust to suit the
    requirements of your tests. For example, here's how to tweak a stream so
    that data flowing from left to right trickles in one byte at a time (but
    data flowing from right to left proceeds at full speed)::

        left, right = memory_stream_pair()
        async def trickle():
            # left is a StapledStream, and left.send_stream is a MemorySendStream
            # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream
            while memory_stream_pump(left.send_stream, right.recv_stream, max_bytes=1):
                # Pause between each byte
                await trio.sleep(1)
        # Normally this send_all_hook calls memory_stream_pump directly without
        # passing in a max_bytes. We replace it with our custom version:
        left.send_stream.send_all_hook = trickle

    And here's a simple test using our modified stream objects::

        async def sender():
            await left.send_all(b"12345")
            await left.send_eof()

        async def receiver():
            async for data in right:
                print(data)

        async with trio.open_nursery() as nursery:
            nursery.start_soon(sender)
            nursery.start_soon(receiver)

    By default, this will print ``b"12345"`` and then immediately exit; with
    our trickle stream it instead sleeps 1 second, then prints ``b"1"``, then
    sleeps 1 second, then prints ``b"2"``, etc.

    Pro-tip: you can insert sleep calls (like in our example above) to
    manipulate the flow of data across tasks... and then use
    :class:`MockClock` and its :attr:`~MockClock.autojump_threshold`
    functionality to keep your test suite running quickly.

    If you want to stress test a protocol implementation, one nice trick is to
    use the :mod:`random` module (preferably with a fixed seed) to move random
    numbers of bytes at a time, and insert random sleeps in between them. You
    can also set up a custom :attr:`~MemoryReceiveStream.receive_some_hook` if
    you want to manipulate things on the receiving side, and not just the
    sending side.

    )rh   rf   r!   r!   r!   r"   memory_stream_pairs  s    Mri   c                   @  s   e Zd ZddddZddddZdddd	d
ZddddZddddZdddddZddddZ	ddddddZ
dS )_LockstepByteQueuer   r   c                 C  s@   t  | _d| _d| _d| _t | _t	d| _
t	d| _d S )NFzanother task is already sendingz!another task is already receiving)r   r   _sender_closed_receiver_closed_receiver_waitingr   r   _waitersr	   r   _send_conflict_detector_receive_conflict_detectorr   r!   r!   r"   r#     s    
z_LockstepByteQueue.__init__c                 C  s   | j   d S r'   )rn   r%   r   r!   r!   r"   _something_happened  s    z&_LockstepByteQueue._something_happenedzCallable[[], bool])fnr   c                   s:   | rq(| j s(| jrq(| j I d H  q t I d H  d S r'   )rk   rl   rn   r?   r   r@   )r    rr   r!   r!   r"   	_wait_for  s    z_LockstepByteQueue._wait_forc                 C  s   d| _ |   d S r$   )rk   rq   r   r!   r!   r"   close_sender  s    z_LockstepByteQueue.close_senderc                 C  s   d| _ |   d S r$   )rl   rq   r   r!   r!   r"   close_receiver  s    z!_LockstepByteQueue.close_receiverr)   r*   c              	     s    j t  jrtj jr tj jr*t  j|7  _    	 fddI d H   jrdtj jrv jrvtjW 5 Q R X d S )Nc                     s
    j dkS NrO   r   r!   r   r!   r"   <lambda>  rO   z-_LockstepByteQueue.send_all.<locals>.<lambda>)
ro   rk   r   r,   rl   r`   r   r9   rq   rs   r-   r!   r   r"   rN     s    
z_LockstepByteQueue.send_allc              	     sh    j X  jrtj jr6t I d H  W 5 Q R  d S   fddI d H   jrZtjW 5 Q R X d S )Nc                     s    j S r'   )rm   r!   r   r!   r"   rx     rO   zB_LockstepByteQueue.wait_send_all_might_not_block.<locals>.<lambda>)ro   rk   r   r,   rl   r@   rs   r   r!   r   r"   rP     s    z0_LockstepByteQueue.wait_send_all_might_not_blockNr/   bytes | bytearrayr0   c              
     s    j  |d k	r*t|}|dk r*td jr6tjd _   z 	 fddI d H  W 5 d _X  jrvtj j
r j
d | } j
d |=    |W  5 Q R  S  jstW 5 Q R  dS W 5 Q R X d S )Nr2   r3   TFc                     s
    j dkS rv   rw   r!   r   r!   r"   rx     rO   z1_LockstepByteQueue.receive_some.<locals>.<lambda>rO   )rp   r4   r5   r6   rl   r   r,   rm   rq   rs   r   rk   r9   )r    r1   gotr!   r   r"   rZ     s*    

z_LockstepByteQueue.receive_some)N)rB   rC   rD   r#   rq   rs   rt   ru   rN   rP   rZ   r!   r!   r!   r"   rj     s   	rj   c                   @  sT   e Zd ZddddZddddZddd	d
ZdddddZddddZdS )_LockstepSendStreamrj   lbqc                 C  s
   || _ d S r'   _lbqr    r}   r!   r!   r"   r#   '  s    z_LockstepSendStream.__init__r   r   c                 C  s   | j   d S r'   )r   rt   r   r!   r!   r"   r&   *  s    z_LockstepSendStream.closec                   s   |    t I d H  d S r'   rR   r   r!   r!   r"   rS   -  s    z_LockstepSendStream.acloser)   r*   c                   s   | j |I d H  d S r'   )r   rN   r-   r!   r!   r"   rN   1  s    z_LockstepSendStream.send_allc                   s   | j  I d H  d S r'   )r   rP   r   r!   r!   r"   rP   4  s    z1_LockstepSendStream.wait_send_all_might_not_blockN)rB   rC   rD   r#   r&   rS   rN   rP   r!   r!   r!   r"   r{   &  s
   r{   c                   @  sH   e Zd ZddddZddddZddd	d
ZddddddZdS )_LockstepReceiveStreamrj   r|   c                 C  s
   || _ d S r'   r~   r   r!   r!   r"   r#   9  s    z_LockstepReceiveStream.__init__r   r   c                 C  s   | j   d S r'   )r   ru   r   r!   r!   r"   r&   <  s    z_LockstepReceiveStream.closec                   s   |    t I d H  d S r'   rR   r   r!   r!   r"   rS   ?  s    z_LockstepReceiveStream.acloseNr/   ry   r0   c                   s   | j |I d H S r'   )r   rZ   r7   r!   r!   r"   rZ   C  s    z#_LockstepReceiveStream.receive_some)N)rB   rC   rD   r#   r&   rS   rZ   r!   r!   r!   r"   r   8  s   r   z tuple[SendStream, ReceiveStream]c                  C  s   t  } t| t| fS )a  Create a connected, pure Python, unidirectional stream where data flows
    in lockstep.

    Returns:
      A tuple
      (:class:`~trio.abc.SendStream`, :class:`~trio.abc.ReceiveStream`).

    This stream has *absolutely no* buffering. Each call to
    :meth:`~trio.abc.SendStream.send_all` will block until all the given data
    has been returned by a call to
    :meth:`~trio.abc.ReceiveStream.receive_some`.

    This can be useful for testing flow control mechanisms in an extreme case,
    or for setting up "clogged" streams to use with
    :func:`check_one_way_stream` and friends.

    In addition to fulfilling the :class:`~trio.abc.SendStream` and
    :class:`~trio.abc.ReceiveStream` interfaces, the return objects
    also have a synchronous ``close`` method.

    )rj   r{   r   r|   r!   r!   r"   lockstep_stream_one_way_pairG  s    r   zYtuple[StapledStream[SendStream, ReceiveStream], StapledStream[SendStream, ReceiveStream]]c                   C  s   t tS )a  Create a connected, pure-Python, bidirectional stream where data flows
    in lockstep.

    Returns:
      A tuple (:class:`~trio.StapledStream`, :class:`~trio.StapledStream`).

    This is a convenience function that creates two one-way streams using
    :func:`lockstep_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.

    )rh   r   r!   r!   r!   r"   lockstep_stream_pairb  s    r   )%
__future__r   r4   typingr   r   r   r    r   r	   Z_highlevel_genericr   abcr   r   Ztyping_extensionsr   objectr   __annotations__r   r   r   r   finalrE   rW   ra   rf   rh   ri   rj   r{   r   r   r   r!   r!   r!   r"   <module>   s6    ?rQ&&U^