from asynci.core import rpc import asyncio import unittest class TestInternal(unittest.IsolatedAsyncioTestCase): async def test_create(self): reader, writer = rpc.create_internal_stream() async def test_close_read(self): reader, writer = rpc.create_internal_stream() writer.close() await writer.drain() self.assertEqual(await reader.read(10), b"") async def test_write(self): reader, writer = rpc.create_internal_stream() writer.write(b"abc") writer.close() await writer.drain() self.assertEqual(await reader.read(), b"abc") async def test_writelines(self): reader, writer = rpc.create_internal_stream() writer.writelines([b"a", b"b"]) writer.close() await writer.drain() self.assertEqual(await reader.read(), b"ab") class _BCS(rpc.BaseConnectionServer): pass class TestBaseConnectionServer(unittest.IsolatedAsyncioTestCase): async def test_create(self): reader, writer = rpc.create_internal_stream() _BCS(reader, writer) async def test_init(self): reader, writer = rpc.create_internal_stream() bcs = _BCS(reader, writer) await bcs.init()