diff --git a/asynci/core/rpc.py b/asynci/core/rpc.py new file mode 100644 index 0000000..3f96336 --- /dev/null +++ b/asynci/core/rpc.py @@ -0,0 +1,74 @@ +import abc +import asyncio +import struct + + +class __InternalProtocol(asyncio.Protocol): + __reader:asyncio.StreamReader + + def __init__(self, reader:asyncio.StreamReader): + assert isinstance(reader, asyncio.StreamReader) + self.__reader = reader + + async def _drain_helper(self): + pass + + +class __InternalTransport(asyncio.WriteTransport): + __close: bool + __reader: asyncio.StreamReader + + def __init__(self, reader: asyncio.StreamReader): + self.__close = True + assert isinstance(reader, asyncio.StreamReader) + self.__reader = reader + + def close(self): + self.__close = False + self.__reader.feed_eof() + + def is_closing(self): + return not self.__close + + def write(self, data: bytes): + self.__reader.feed_data(data) + + +def create_internal_stream(loop=None): + if loop is None: + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader(loop=None) + internal_protocol = __InternalProtocol(reader) + internal_transport = __InternalTransport(reader) + writer = asyncio.StreamWriter(internal_transport, internal_protocol, reader=reader, loop=loop) + return reader, writer + + +class BaseConnectionServer(abc.ABC): + __in:asyncio.StreamReader + __out:asyncio.StreamWriter + __size_format = struct.Struct("!I") + + def __init__(self, in_io:asyncio.StreamReader, out_io:asyncio.StreamWriter): + if not isinstance(in_io, asyncio.StreamReader): + raise TypeError("in_io have to be a stream reader.") + if not isinstance(out_io, asyncio.StreamWriter): + raise TypeError("out_io have to be a stream writer.") + self.__in, self.__out = in_io, out_io + + async def init(self): + # Send own config + self.__out.write(self.__size_format.pack(0)) + await self.__out.drain() + + # Read other config + size = self.__size_format.unpack(await self.__in.readexactly(self.__size_format.size))[0] + await self.__in.readexactly(size) + + # Send applied config + self.__out.write(self.__size_format.pack(0)) + await self.__out.drain() + + # Read other applied config + size = self.__size_format.unpack(await self.__in.readexactly(self.__size_format.size))[0] + await self.__in.readexactly(size) diff --git a/tests/asynci_core_rpc.py b/tests/asynci_core_rpc.py new file mode 100644 index 0000000..88b871f --- /dev/null +++ b/tests/asynci_core_rpc.py @@ -0,0 +1,43 @@ +from asynci.core import rpc +import asyncio +import unittest + + +class TestInternal(unittest.IsolatedAsyncioTestCase): + async def test_create(self): + reader, writer = rpc.create_internal_stream() + + async def test_close_read(self): + reader, writer = rpc.create_internal_stream() + writer.close() + await writer.drain() + self.assertEqual(await reader.read(10), b"") + + async def test_write(self): + reader, writer = rpc.create_internal_stream() + writer.write(b"abc") + writer.close() + await writer.drain() + self.assertEqual(await reader.read(), b"abc") + + async def test_writelines(self): + reader, writer = rpc.create_internal_stream() + writer.writelines([b"a", b"b"]) + writer.close() + await writer.drain() + self.assertEqual(await reader.read(), b"ab") + + +class _BCS(rpc.BaseConnectionServer): + pass + + +class TestBaseConnectionServer(unittest.IsolatedAsyncioTestCase): + async def test_create(self): + reader, writer = rpc.create_internal_stream() + _BCS(reader, writer) + + async def test_init(self): + reader, writer = rpc.create_internal_stream() + bcs = _BCS(reader, writer) + await bcs.init()