U
    g-                     @  s  d dl mZ d dlZd dlZd dlmZ d dlZd dlZG dd deZ	ddddd	Z
d
dddZddddZddd
dddZdd
dddZddd
dddZddd
dddZddd
dddZddd
ddd Zddd
dd!d"Zddd
dd#d$Zd
dd%d&ZdS )'    )annotationsN)Protocolc                   @  s   e Zd ZddddddZdS )RawInput strpromptreturnc                 C  s   d S N )selfr   r   r   9/tmp/pip-unpacked-wheel-ks04xdmi/trio/_tests/test_repl.py__call__       zRawInput.__call__N)r   )__name__
__module____qualname__r   r   r   r   r   r      s   r   z	list[str])cmdsr	   c                   s(   t |  g dddd fdd}|S )z
    Pass in a list of strings.
    Returns a callable that returns each string, each time its called
    When there are not more strings to return, raise EOFError
    r   r   r   c                   s4    |  z
t W S  tk
r.   td Y nX d S r
   )appendnextStopIterationEOFError)r   Z	cmds_iterZpromptsr   r   _raw_helper   s
    

z$build_raw_input.<locals>._raw_helper)r   )iter)r   r   r   r   r   build_raw_input   s    r   None)r	   c               	   C  s8   t dg} |  dksttt |   W 5 Q R X dS )z"Quick test of our helper function.Zcmd1N)r   AssertionErrorpytestraisesr   )	raw_inputr   r   r   test_build_raw_input#   s    
r!   zdict[str, object]c                   C  s   dt iS )N__builtins__)r"   r   r   r   r   build_locals.   s    r#   zpytest.CaptureFixture[str]zpytest.MonkeyPatch)capsysmonkeypatchr	   c                   s~   t jjt d}tdddddddd	d
ddddg}||d| t j|I dH  |  \}}| ddddddgkszt	dS )z
    Run some basic commands through the interpreter while capturing stdout.
    Ensure that the interpreted prints the expected results.
    Zrepl_localszx = 1zprint(f'{x=}')z'hello'zdef func():z  print(x + 1)r   zfunc()zasync def afunc():z
  return 4zawait afunc()
import sysz"sys.stdout.write('hello stdout\n')r    Nzx=124zhello stdoutZ13)
trio_replTrioInteractiveConsoler#   r   setattrrun_repl
readouterr
splitlinesr   r$   r%   consoler    outerrr   r   r   test_basic_interaction2   s*    r5   )r%   r	   c              	     sT   t jjt d}tdg}| |d| tt t j	|I d H  W 5 Q R X d S )Nr&   zraise SystemExitr    )
r*   r+   r,   r#   r   r-   r   r   
SystemExitr.   )r%   r2   r    r   r   r   "test_system_exits_quit_interpreterW   s    r7   c              
     s~   t jjt d}tdddddddd	d
g	}||d| t j|I d H  |  \}}d|ksbtd|ksntd|ksztd S )Nr&   z#from trio._util import signal_raisez"import signal, trio, trio.lowlevelzasync def f():z`  trio.lowlevel.spawn_system_task(    trio.to_thread.run_sync,    signal_raise,signal.SIGINT,  )z  await trio.sleep_forever()z  print('should not see this')r   z	await f()z print('AFTER KeyboardInterrupt')r    KeyboardInterruptZshouldzAFTER KeyboardInterrupt	r*   r+   r,   r#   r   r-   r.   r/   r   r1   r   r   r   test_KI_interruptsc   s&    r:   c                   s`   t jjt d}tddddddg}||d| t j|I d H  |  \}}d	|ks\td S )
Nr&   r'   if sys.version_info < (3, 11):/  from exceptiongroup import BaseExceptionGroupr   z<raise BaseExceptionGroup('', [RuntimeError(), SystemExit()])!print('AFTER BaseExceptionGroup')r    AFTER BaseExceptionGroupr9   r1   r   r   r   test_system_exits_in_exc_group   s    
r?   c                   sb   t jjt d}tdddddddg}||d	| t j|I d H  |  \}}d
|ks^td S )Nr&   r'   r;   r<   r   zraise BaseExceptionGroup(z?  '', [BaseExceptionGroup('', [RuntimeError(), SystemExit()])])r=   r    r>   r9   r1   r   r   r   %test_system_exits_in_nested_exc_group   s    r@   c                   sp   t jjt d}tddg}||d| t j|I d H  |  \}}d|ksTtd|ks`td|ksltd S )Nr&   zraise BaseExceptionprint('AFTER BaseException')r    _threads.py_repl.pyAFTER BaseExceptionr9   r1   r   r   r   test_base_exception_captured   s    rE   c                   sX   t jjt d}tddg}||d| t j|I d H  |  \}}d|ksTtd S )Nr&   z&raise ExceptionGroup('', [KeyError()])zprint('AFTER ExceptionGroup')r    zAFTER ExceptionGroupr9   r1   r   r   r   test_exc_group_captured   s    rF   c                   sv   t jjt d}tdddddg}||d| t j|I d H  |  \}}d|ksZtd	|ksftd
|ksrtd S )Nr&   z-async def async_func_raises_base_exception():z  raise BaseExceptionr   z(await async_func_raises_base_exception()rA   r    rB   rC   rD   r9   r1   r   r   r   *test_base_exception_capture_from_coroutine   s    rG   c                  C  s(   t jtjddgdd} | jdks$tdS )zL
    Basic smoke test when running via the package __main__ entrypoint.
    z-mr*   s   exit())inputr   N)
subprocessrunsys
executable
returncoder   )replr   r   r   test_main_entrypoint   s    rO   )
__future__r   rI   rK   typingr   r   Z
trio._replr*   r   r   r!   r#   r5   r7   r:   r?   r@   rE   rF   rG   rO   r   r   r   r   <module>   s$   %