Skip to main content
Glama
test_vm_edge_cases.cpython-313-pytest-8.3.5.pyc17.2 kB
� �*h��j�SSKrSSKJs Jr SSKrSSKJrJ r SSK r SSK J r J r JrJrJr SSKr\R$S5r\R$S5r\R*R"S5r\R*R"S5r\R*R"S5r\R*R"S 5r\R*R"S 5r\R*R"S 5r\R*R"S 5r\R*R"S 5rg)�N)�patch� MagicMock)�start_vm�stop_vm� reboot_vm�list_vms� get_vnc_portsc��[5nU$)z,Fixture to provide a mock libvirt connection�r)�conns �8/home/steve/projects/kvm-mcp/tests/test_vm_edge_cases.py� mock_connrs�� �;�D� �K�c��[5nU$)z(Fixture to provide a mock libvirt domainr )�domains r � mock_domainr s���[�F� �Mrc��# �[S5nXlXRl[RS4UR l[ SSS05IShv�N nUSnSoTU:Hof(do[R"S U4S XE45[R"U5[R"U5S .-nS S U0-n[[R"U55eS=n=peSoCSoTU;of(do[R"SU4SXE45[R"U5[R"U5S .-nS S U0-n[[R"U55eS=n=peSSS5 gGN!,(df  g=f7f)z*Test starting a VM that is already running� libvirt.openrr�name�test-vmN�status�error��==�z%(py1)s == %(py4)s��py1�py4�assert %(py6)s�py6zVM test-vm is already running�message��in�z%(py1)s in %(py4)s) r� return_value� lookupByName�libvirt�VIR_DOMAIN_RUNNING�stater� @pytest_ar�_call_reprcompare� _saferepr�AssertionError�_format_explanation� rr�mock_libvirt_open�result� @py_assert0� @py_assert3� @py_assert2� @py_format5� @py_format7s r �test_start_vm_already_runningr7s��� �~� �"3�)2�&�.9���+�*1�*D�*D�a�)H� ���&�� �V�Y�,?�@�@���h��*�7�*�7�*�*�*�*�*��*�*�*��*�*�*�7�*�*�*�*�*�*�*�.�C��2C�C�2C�C�C�C�C�C�.�C�C�C�.�C�C�C�2C�C�C�C�C�C�C�C� � � A� � ��0� F�A E1�E.�D E1�% F�.E1�1 E?�;Fc��# �[S5nXlXRl[RS4UR l[ SSS05IShv�N nUSnSoTU:Hof(do[R"S U4S XE45[R"U5[R"U5S .-nS S U0-n[[R"U55eS=n=peSoCSoTU;of(do[R"SU4SXE45[R"U5[R"U5S .-nS S U0-n[[R"U55eS=n=peSSS5 gGN!,(df  g=f7f)z*Test stopping a VM that is already stoppedrrrrrNrrrrrrr zVM test-vm is already stoppedr!r"r$) rr%r&r'�VIR_DOMAIN_SHUTOFFr)rr*r+r,r-r.r/s r �test_stop_vm_already_stoppedr;s��� �~� �"3�)2�&�.9���+�*1�*D�*D�a�)H� ���&��y�6�9�*=�>�>���h��*�7�*�7�*�*�*�*�*��*�*�*��*�*�*�7�*�*�*�*�*�*�*�.�C��2C�C�2C�C�C�C�C�C�.�C�C�C�.�C�C�C�2C�C�C�C�C�C�C�C� � � ?� � �r8c��# �[S5nXlXRl[RS4UR l[ SSS05IShv�N nUSnSoTU:Hof(do[R"S U4S XE45[R"U5[R"U5S .-nS S U0-n[[R"U55eS=n=peSoCSoTU;of(do[R"SU4SXE45[R"U5[R"U5S .-nS S U0-n[[R"U55eS=n=peSSS5 gGN!,(df  g=f7f)z#Test rebooting a VM that is stoppedrrrrrNrrrrrrr z+Cannot reboot VM test-vm: VM is not runningr!r"r$) rr%r&r'r:r)rr*r+r,r-r.r/s r �test_reboot_vm_when_stoppedr=+s��� �~� �"3�)2�&�.9���+�*1�*D�*D�a�)H� ���&� ��v�y�.A�B�B���h��*�7�*�7�*�*�*�*�*��*�*�*��*�*�*�7�*�*�*�*�*�*�*�<�Q�y�@Q�Q�@Q�Q�Q�Q�Q�Q�<�Q�Q�Q�<�Q�Q�Q�@Q�Q�Q�Q�Q�Q�Q�Q� � � C� � �r8c��# �[[[/nUGH*nU"URSS05IShv�N nUSnSoCU:HoU(do[R "SU4SX445[R "U5[R "U5S.-nS S U0-n[[R"U55eS=n=pTS o2S oCU;oU(do[R "S U4SX445[R "U5[R "U5S.-nS S U0-n[[R"U55eS=n=pTGM- gGN7f)z%Test VM operations with empty VM namer�Nrrrrrrr zVM name not providedr!r"r$) rrr�__name__r*r+r,r-r.)� operations�opr1r2r3r4r5r6s r �"test_vm_operations_with_empty_namerC7s�����G�Y�/�J����"�+�+���|�4�4���h��*�7�*�7�*�*�*�*�*��*�*�*��*�*�*�7�*�*�*�*�*�*�*�%�:� �):�:�):�:�:�:�:�:�%�:�:�:�%�:�:�:�):�:�:�:�:�:�:�:��4�s�0E �E�DE c ��:# �Sn[[[/n[S5nXl[ R "S5URlUGH*nU"URSU05IShv�N nUSnSovU:Ho�(do[R"SU4S Xg45[R"U5[R"U5S .-n S S U 0-n [[R"U 55eS=n=p�SoeS ovU;o�(do[R"SU4SXg45[R"U5[R"U5S .-n S S U 0-n [[R"U 55eS=n=p�GM- SSS5 gGN!,(df  g=f7f)z.Test VM operations with extremely long VM name�aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaarzInvalid VM namerNrrrrrrr r!r"r$)rrrrr%r'� libvirtErrorr&� side_effectr@r*r+r,r-r.) r�very_long_namerAr0rBr1r2r3r4r5r6s r �&test_vm_operations_with_very_long_namerI@s����N��G�Y�/�J� �~� �"3�)2�&�-4�-A�-A�BS�-T� ���*��B��b�k�k�F�N�+C�D�D�F��(�#� .�w� .�w�.� .� .� .� .�#� .� .� .�#� .� .� .�w� .� .� .� .� .� .� .�$� 9�y�(9� 9�(9�9� 9� 9� 9� 9�$� 9� 9� 9�$� 9� 9� 9�(9� 9� 9� 9� 9� 9� 9� 9�� � � E� � �s0�F�A F �+F�,DF �> F�F � F�Fc ��F# �[S5nXl/n[RS4[RS4[R S4[R S4S/n[U5H{unupV[5nSU3URlXV4URlXGRlSURlSURlURU5 M} X Rl[!S05IShv�N n[#U5n [#U5o�U :Ho�(Gd�[$R&"S U 4S X�45S [(R*"5;d[$R,"["5(a[$R."["5OS S [(R*"5;d[$R,"U5(a[$R."U5OS [$R."U 5S [(R*"5;d[$R,"["5(a[$R."["5OS S [(R*"5;d[$R,"U5(a[$R."U5OS [$R."U 5S.-n SSU 0-n [1[$R2"U 55eS=n =p�[RS[RS[R S[R S0n[U5H�upOUR5X4SS5nUSnUU:Ho�(d�[$R&"S U 4SUU45[$R."U5S[(R*"5;d[$R,"U5(a[$R."U5OSS.-nSSU0-n[1[$R2"U55eS=nn M� SSS5 gGN!,(df  g=f7f)zITest listing VMs with various states including crashed and unknown statesrr)i�rzvm-i�rNr)zN%(py3)s {%(py3)s = %(py0)s(%(py1)s) } == %(py8)s {%(py8)s = %(py5)s(%(py6)s) }�lenr1�states)�py0r�py3�py5r �py8zassert %(py10)s�py10�running�shutoff�crashedzno state�unknownr))z%(py1)s == %(py3)s�expected_state�rrO�assert %(py5)srP)rr%r'r(r:�VIR_DOMAIN_CRASHED�VIR_DOMAIN_NOSTATE� enumeraterrr)�ID� maxMemory�maxVcpus�append�listAllDomainsrrLr*r+� @py_builtins�locals�_should_repr_global_namer,r-r.�get)rr0�domainsrM�ir)�reasonrr1r4� @py_assert7� @py_assert4� @py_format9� @py_format11� state_map�vmrWr2� @py_format4� @py_format6s r �test_list_vms_with_mixed_statesrqOs���� �~� �"3�)2�&��� � '� '�� +� � '� '�� +� � '� '�� +� � '� '�� +� �  ��#,�F�"3� �A����[�F�),�Q�C�y�F�K�K� $�).��F�L�L� %�%&�I�I� "�,7�F� � � )�+,�F�O�O� (� �N�N�6� "�#4�18� � �-�� �B�/�/���6�{�)�c�&�k�)�k�)�)�)�)�)�{�)�)�)�)�)�)�s�)�)�)�)�s�)�)�)�)�)�)�6�)�)�)�)�6�)�)�)�{�)�)�)�)�)�)�c�)�)�)�)�c�)�)�)�)�)�)�&�)�)�)�)�&�)�)�)�k�)�)�)�)�)�)�)� � &� &� � � &� &� � � &� &� � � &� &� �  � ��v�&�E�A�&�]�]�6�9�Q�<��C�N��g�;� 0�;�.�0� 0� 0� 0� 0�;�.� 0� 0� 0�;� 0� 0� 0� 0� 0� 0�.� 0� 0� 0� 0�.� 0� 0� 0� 0� 0� 0� 0�'�C � �.0�/ � �s0� P!�C9P�P �K<P� P!� P� P�P!c ��h^ # �[S5n[S5nXlS00S.SSSS.0S.S S S S S .SSS.S.SSS00S./nUGH1m U 4SjnXBl[S05IShv�N nUSnSovU:Ho�(do[R "SU4SXg45[R "U5[R "U5S.-n SSU 0-n [[R"U 55eS=n=p�USnT SovU:Ho�(do[R "SU4SXg45[R "U5[R "U5S.-n SSU 0-n [[R"U 55eS=n=p�GM4 SSS5 SSS5 gGN(!,(df  N=f!,(df  g=f7f) z3Test VNC port retrieval with malformed virsh outputrzsubprocess.runr?)� list_stdout�vncdisplay_outputs�expected_portsztest-vm test-vm2 �abc�def)r�test-vm2ztest-vm test-vm2 test-vm3 z:1�invalidz:2)rrx�test-vm3i i)rrzztest-vm rz:99999c�>�USS:Xa[STSSS9$USS:Xa#USn[STS RUS5SS9$g) N��listrrsr?)� returncode�stdout�stderr� vncdisplayrKrt)rre)�cmd�kwargs�vm_name�cases �r �mock_run_command�=test_get_vnc_ports_malformed_output.<locals>.mock_run_command�so����q�6�V�#�$�#$�#�M�2�!��� ��V�|�+�!�!�f�G�$�#$�#�$8�9�=�=�g�r�J�!���,rr Nr�successrrrrr � vnc_portsru) rr%rGr r*r+r,r-r.) rr0�mock_run� test_casesr�r1r2r3r4r5r6r�s @r �#test_get_vnc_ports_malformed_outputr�ws����� �~� �"3� �� �H�)2�&� "�&(�"$� � 5�$� %�'�#%�  � ?�#� )� $�'� $� $�#� � +��x�'�#%�  �=% � �N�D� �$4� �(��"�=�=�F��(�#� 0�y� 0�y�0� 0� 0� 0� 0�#� 0� 0� 0�#� 0� 0� 0�y� 0� 0� 0� 0� 0� 0� 0��+�&� @�$�/?�*@� @�*@�@� @� @� @� @�&� @� @� @�&� @� @� @�*@� @� @� @� @� @� @� @�)�W !� � �|>�{ !� �� � �sG� F2� F!�A F�%F �&DF�<F!� F2� F� F �F!�! F/�+F2c ��8# �[S5nXlXRl[RS4UR l[ SSS05[SSS05[SSS05/n[R"U6IShv�N nUGHlnS ofU;ow(d�[R"S U4S Xe45[R"U5S [R"5;d[R "U5(a[R"U5OS S .-nSSU0-n [#[R$"U 55eS=pgSofU;ow(d�[R"S U4S Xe45[R"U5S [R"5;d[R "U5(a[R"U5OS S .-nSSU0-n [#[R$"U 55eS=pgGMo SSS5 gGN�!,(df  g=f7f)z1Test handling multiple VM operations concurrentlyrrrrrrrNrr")z%(py1)s in %(py3)sr1rXrYrPr!)rr%r&r'r(r)rrr�asyncio�gatherr*r+r,rbrcrdr-r.) rrr0rA�resultsr1r2r4rorps r �test_concurrent_vm_operationsr��s]��� �~� �"3�)2�&�.9���+�*1�*D�*D�a�)H� ���&� �Z�&�)�!4� 5� �I�� �2� 3� �k�F�I�#6� 7� � �  ��� �3�3���F�� %�v�%� %� %� %� %�8� %� %� %�8� %� %� %� %� %� %�v� %� %� %� %�v� %� %� %� %� %� %� %�� &��&� &� &� &� &�9� &� &� &�9� &� &� &� &� &� &�� &� &� &� &�� &� &� &� &� &� &� &�� � �4� � �s0� H�A7H �H�E7H �= H�H � H�H)�builtinsrb�_pytest.assertion.rewrite� assertion�rewriter*�pytest� unittest.mockrrr'�kvm_mcp_serverrrrrr r��fixturerr�markr7r;r=rCrIrqr�r��rr �<module>r�sE���� �*��P�P�������� ������ ����� D�� D������ D�� D������ R�� R������;��;������ :�� :������%1��%1�N�����BA��BA�H�����'��'r

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/steveydevey/kvm-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server