U
    g~&                     @  s   d dl mZ d dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZ ddlmZ ddlmZmZmZmZ ddlmZ dgZed	Zed
ZG dd de	e ZG dd dZdS )    )annotationsN)AnyAsyncIteratorCallableGenericIterableTypeVar   )ConcurrencyError)	OP_BINARYOP_CONTOP_TEXTFrame)Data	Assemblerzutf-8Tc                   @  sh   e Zd ZdZddddZddddZd	dd
ddZd	dddZdddddZddddZ	dS )SimpleQueuez
    Simplified version of :class:`asyncio.Queue`.

    Provides only the subset of functionality needed by :class:`Assembler`.

    Nonereturnc                 C  s   t  | _d | _t | _d S N)asyncioZget_running_looploop
get_waitercollectionsdequequeueself r   ?/tmp/pip-unpacked-wheel-dx_q7dq3/websockets/asyncio/messages.py__init__#   s    
zSimpleQueue.__init__intc                 C  s
   t | jS r   )lenr   r   r   r   r    __len__(   s    zSimpleQueue.__len__r   )itemr   c                 C  s0   | j | | jdk	r,| j s,| jd dS )z+Put an item into the queue without waiting.N)r   appendr   doneZ
set_result)r   r%   r   r   r    put+   s    zSimpleQueue.putc                   sR   | j sH| jdk	rtd| j | _z| jI dH  W 5 | j  d| _X | j  S )z?Remove and return an item from the queue, waiting if necessary.Nzget is already running)r   r   r
   r   Zcreate_futurecancelpopleftr   r   r   r    get1   s    

zSimpleQueue.getzIterable[T])itemsr   c                 C  s0   | j dkstd| jr td| j| dS )z)Put back items into an empty, idle queue.Nz%cannot reset() while get() is runningz&cannot reset() while queue isn't empty)r   AssertionErrorr   extend)r   r,   r   r   r    reset>   s    zSimpleQueue.resetc                 C  s2   | j d k	r$| j  s$| j td | j  d S )Nstream of frames ended)r   r'   Zset_exceptionEOFErrorr   clearr   r   r   r    abortD   s    zSimpleQueue.abortN)
__name__
__module____qualname____doc__r!   r$   r(   r+   r/   r3   r   r   r   r    r      s   r   c                   @  s   e Zd ZdZdddd dd fddd	d	d
dddZd!dddddZd"dddddZdd
dddZd
dddZd
dddZ	d
ddd Z
dS )#r   a  
    Assemble messages from frames.

    :class:`Assembler` expects only data frames. The stream of frames must
    respect the protocol; if it doesn't, the behavior is undefined.

    Args:
        pause: Called when the buffer of frames goes above the high water mark;
            should pause reading from the network.
        resume: Called when the buffer of frames goes below the low water mark;
            should resume reading from the network.

       Nc                   C  s   d S r   r   r   r   r   r    <lambda>_       zAssembler.<lambda>c                   C  s   d S r   r   r   r   r   r    r9   `   r:   r"   z
int | NonezCallable[[], Any]r   )highlowpauseresumer   c                 C  sh   t  | _|d kr|d }|dk r(td||k r8td|| | _| _|| _|| _d| _d| _d| _	d S )N   r   z%low must be positive or equal to zeroz)high must be greater than or equal to lowF)
r   frames
ValueErrorr;   r<   r=   r>   pausedget_in_progressclosed)r   r;   r<   r=   r>   r   r   r    r!   [   s    zAssembler.__init__zbool | Noner   )decoder   c                   s&  | j rtd| jrtdd| _z| j I dH }W n tjk
rT   d| _ Y nX |   |j	t
ksv|j	tksvt|dkr|j	t
k}|g}|jsz| j I dH }W n* tjk
r   | j| d| _ Y nX |   |j	tkst|| qd| _ddd |D }|r| S |S dS )	a  
        Read the next message.

        :meth:`get` returns a single :class:`str` or :class:`bytes`.

        If the message is fragmented, :meth:`get` waits until the last frame is
        received, then it reassembles the message and returns it. To receive
        messages frame by frame, use :meth:`get_iter` instead.

        Args:
            decode: :obj:`False` disables UTF-8 decoding of text frames and
                returns :class:`bytes`. :obj:`True` forces UTF-8 decoding of
                binary frames and returns :class:`str`.

        Raises:
            EOFError: If the stream of frames has ended.
            ConcurrencyError: If two coroutines run :meth:`get` or
                :meth:`get_iter` concurrently.

        r0   &get() or get_iter() is already runningTNFr:   c                 s  s   | ]}|j V  qd S r   )data).0framer   r   r    	<genexpr>   s     z Assembler.get.<locals>.<genexpr>)rD   r1   rC   r
   r@   r+   r   CancelledErrormaybe_resumeopcoder   r   r-   finr/   r   r&   joinrE   )r   rE   rI   r@   rG   r   r   r    r+   z   s<    
zAssembler.getzAsyncIterator[Data]c                 C s  | j rtd| jrtdd| _z| j I dH }W n tjk
rT   d| _ Y nX |   |j	t
ksv|j	tksvt|dkr|j	t
k}|rt }||j|jV  n|jV  |js| j I dH }|   |j	tkst|r||j|jV  q|jV  qd| _dS )ap  
        Stream the next message.

        Iterating the return value of :meth:`get_iter` asynchronously yields a
        :class:`str` or :class:`bytes` for each frame in the message.

        The iterator must be fully consumed before calling :meth:`get_iter` or
        :meth:`get` again. Else, :exc:`ConcurrencyError` is raised.

        This method only makes sense for fragmented messages. If messages aren't
        fragmented, use :meth:`get` instead.

        Args:
            decode: :obj:`False` disables UTF-8 decoding of text frames and
                returns :class:`bytes`. :obj:`True` forces UTF-8 decoding of
                binary frames and returns :class:`str`.

        Raises:
            EOFError: If the stream of frames has ended.
            ConcurrencyError: If two coroutines run :meth:`get` or
                :meth:`get_iter` concurrently.

        r0   rF   TNF)rD   r1   rC   r
   r@   r+   r   rK   rL   rM   r   r   r-   UTF8DecoderrE   rG   rN   r   )r   rE   rI   decoderr   r   r    get_iter   s4    

zAssembler.get_iterr   )rI   r   c                 C  s&   | j rtd| j| |   dS )z
        Add ``frame`` to the next message.

        Raises:
            EOFError: If the stream of frames has ended.

        r0   N)rD   r1   r@   r(   maybe_pause)r   rI   r   r   r    r(      s    zAssembler.putr   c                 C  s(   t | j| jkr$| js$d| _|   dS )z7Pause the writer if queue is above the high water mark.TN)r#   r@   r;   rB   r=   r   r   r   r    rS   	  s    zAssembler.maybe_pausec                 C  s(   t | j| jkr$| jr$d| _|   dS )z7Resume the writer if queue is below the low water mark.FN)r#   r@   r<   rB   r>   r   r   r   r    rL     s    zAssembler.maybe_resumec                 C  s   | j r
dS d| _ | j  dS )z
        End the stream of frames.

        Callling :meth:`close` concurrently with :meth:`get`, :meth:`get_iter`,
        or :meth:`put` is safe. They will raise :exc:`EOFError`.

        NT)rD   r@   r3   r   r   r   r    close  s    zAssembler.close)N)N)r4   r5   r6   r7   r!   r+   rR   r(   rS   rL   rT   r   r   r   r    r   K   s   @A)
__future__r   r   codecsr   typingr   r   r   r   r   r   
exceptionsr
   r@   r   r   r   r   r   __all__getincrementaldecoderrP   r   r   r   r   r   r   r    <module>   s    	
0