from asynci.core import connector import asyncio import unittest class TestInternal(unittest.IsolatedAsyncioTestCase): async def test_create(self): reader, writer = connector.create_internal_stream() async def test_close_read(self): reader, writer = connector.create_internal_stream() writer.close() await writer.drain() self.assertEqual(await reader.read(10), b"") async def test_write(self): reader, writer = connector.create_internal_stream() writer.write(b"abc") writer.close() await writer.drain() self.assertEqual(await reader.read(), b"abc") async def test_writelines(self): reader, writer = connector.create_internal_stream() writer.writelines([b"a", b"b"]) writer.close() await writer.drain() self.assertEqual(await reader.read(), b"ab") class _BCS(connector.BaseConnectionServer): async def remote_opend( self, read: asyncio.StreamReader, write: asyncio.StreamWriter ): raise NotImplementedError("remote_opend isn't implemented.") class TestBaseConnectionServer(unittest.IsolatedAsyncioTestCase): async def test_create(self): reader, writer = connector.create_internal_stream() _BCS(reader, writer) async def test_init(self): reader, writer = connector.create_internal_stream() bcs = _BCS(reader, writer) await bcs.init() async def _gen_bcss(self, init=True): reader2, writer1 = connector.create_internal_stream() reader1, writer2 = connector.create_internal_stream() bcs1 = _BCS(reader1, writer1) bcs2 = _BCS(reader2, writer2) if init: t1 = asyncio.create_task(bcs1.init()) t2 = asyncio.create_task(bcs2.init()) await t1 await t2 return bcs1, bcs2 async def test_init_dyn(self): bcs1, bcs2 = await self._gen_bcss(init=False) t1 = asyncio.create_task(bcs1.init()) t2 = asyncio.create_task(bcs2.init()) await t1 await t2 async def test_close(self): # Init bcs1, bcs2 = await self._gen_bcss() # Run server t1 = asyncio.create_task(bcs1.run_server()) t2 = asyncio.create_task(bcs2.run_server()) await bcs1.close() await t1 await t2