diff mbox series

[bitbake-devel,v6,02/22] hashserv: Add websocket connection implementation

Message ID 20231103142640.1936827-3-JPEWhacker@gmail.com
State New
Headers show
Series Bitbake Hash Server WebSockets, Alternate Database Backend, and User Management | expand

Commit Message

Joshua Watt Nov. 3, 2023, 2:26 p.m. UTC
Adds support to the hash equivalence client and server to communicate
over websockets. Since websockets are message orientated instead of
stream orientated, and new connection class is needed to handle them.

Note that websocket support does require the 3rd party websockets python
module be installed on the host, but it should not be required unless
websockets are actually being used.

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 lib/bb/asyncrpc/client.py     | 11 +++++++-
 lib/bb/asyncrpc/connection.py | 44 +++++++++++++++++++++++++++++
 lib/bb/asyncrpc/serv.py       | 53 ++++++++++++++++++++++++++++++++++-
 lib/hashserv/__init__.py      | 13 +++++++++
 lib/hashserv/client.py        |  1 +
 lib/hashserv/tests.py         | 17 +++++++++++
 6 files changed, 137 insertions(+), 2 deletions(-)

Comments

Matthias Schnelte Nov. 10, 2023, 12:03 p.m. UTC | #1
Hi Joshua,

thanks for this change! Being able to use websockets instead of some tcp 
connection would help a lot in cooperate setups which are often 
restricted to only http(s) ports and enforce the use of a cooperate proxy.

Unfortunately the websocket library you are using seems not to support 
websockets over http proxy. At least that is what I understood.

Would it be possible to use another client lib for websockets in order 
to support connection through proxy?

This library seems to support it: 
https://websocket-client.readthedocs.io/en/latest/examples.html#connecting-through-a-proxy

Matthias

On 03.11.23 15:26, Joshua Watt wrote:
> Adds support to the hash equivalence client and server to communicate
> over websockets. Since websockets are message orientated instead of
> stream orientated, and new connection class is needed to handle them.
>
> Note that websocket support does require the 3rd party websockets python
> module be installed on the host, but it should not be required unless
> websockets are actually being used.
>
> Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
> ---
>   lib/bb/asyncrpc/client.py     | 11 +++++++-
>   lib/bb/asyncrpc/connection.py | 44 +++++++++++++++++++++++++++++
>   lib/bb/asyncrpc/serv.py       | 53 ++++++++++++++++++++++++++++++++++-
>   lib/hashserv/__init__.py      | 13 +++++++++
>   lib/hashserv/client.py        |  1 +
>   lib/hashserv/tests.py         | 17 +++++++++++
>   6 files changed, 137 insertions(+), 2 deletions(-)
>
> diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
> index 7f33099b..802c07df 100644
> --- a/lib/bb/asyncrpc/client.py
> +++ b/lib/bb/asyncrpc/client.py
> @@ -10,7 +10,7 @@ import json
>   import os
>   import socket
>   import sys
> -from .connection import StreamConnection, DEFAULT_MAX_CHUNK
> +from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK
>   from .exceptions import ConnectionClosedError
>   
>   
> @@ -47,6 +47,15 @@ class AsyncClient(object):
>   
>           self._connect_sock = connect_sock
>   
> +    async def connect_websocket(self, uri):
> +        import websockets
> +
> +        async def connect_sock():
> +            websocket = await websockets.connect(uri, ping_interval=None)
> +            return WebsocketConnection(websocket, self.timeout)
> +
> +        self._connect_sock = connect_sock
> +
>       async def setup_connection(self):
>           # Send headers
>           await self.socket.send("%s %s" % (self.proto_name, self.proto_version))
> diff --git a/lib/bb/asyncrpc/connection.py b/lib/bb/asyncrpc/connection.py
> index c4fd2475..a10628f7 100644
> --- a/lib/bb/asyncrpc/connection.py
> +++ b/lib/bb/asyncrpc/connection.py
> @@ -93,3 +93,47 @@ class StreamConnection(object):
>           if self.writer is not None:
>               self.writer.close()
>               self.writer = None
> +
> +
> +class WebsocketConnection(object):
> +    def __init__(self, socket, timeout):
> +        self.socket = socket
> +        self.timeout = timeout
> +
> +    @property
> +    def address(self):
> +        return ":".join(str(s) for s in self.socket.remote_address)
> +
> +    async def send_message(self, msg):
> +        await self.send(json.dumps(msg))
> +
> +    async def recv_message(self):
> +        m = await self.recv()
> +        return json.loads(m)
> +
> +    async def send(self, msg):
> +        import websockets.exceptions
> +
> +        try:
> +            await self.socket.send(msg)
> +        except websockets.exceptions.ConnectionClosed:
> +            raise ConnectionClosedError("Connection closed")
> +
> +    async def recv(self):
> +        import websockets.exceptions
> +
> +        try:
> +            if self.timeout < 0:
> +                return await self.socket.recv()
> +
> +            try:
> +                return await asyncio.wait_for(self.socket.recv(), self.timeout)
> +            except asyncio.TimeoutError:
> +                raise ConnectionError("Timed out waiting for data")
> +        except websockets.exceptions.ConnectionClosed:
> +            raise ConnectionClosedError("Connection closed")
> +
> +    async def close(self):
> +        if self.socket is not None:
> +            await self.socket.close()
> +            self.socket = None
> diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
> index 3e0d0632..dfb03773 100644
> --- a/lib/bb/asyncrpc/serv.py
> +++ b/lib/bb/asyncrpc/serv.py
> @@ -12,7 +12,7 @@ import signal
>   import socket
>   import sys
>   import multiprocessing
> -from .connection import StreamConnection
> +from .connection import StreamConnection, WebsocketConnection
>   from .exceptions import ClientError, ServerError, ConnectionClosedError
>   
>   
> @@ -178,6 +178,54 @@ class UnixStreamServer(StreamServer):
>           os.unlink(self.path)
>   
>   
> +class WebsocketsServer(object):
> +    def __init__(self, host, port, handler, logger):
> +        self.host = host
> +        self.port = port
> +        self.handler = handler
> +        self.logger = logger
> +
> +    def start(self, loop):
> +        import websockets.server
> +
> +        self.server = loop.run_until_complete(
> +            websockets.server.serve(
> +                self.client_handler,
> +                self.host,
> +                self.port,
> +                ping_interval=None,
> +            )
> +        )
> +
> +        for s in self.server.sockets:
> +            self.logger.debug("Listening on %r" % (s.getsockname(),))
> +
> +            # Enable keep alives. This prevents broken client connections
> +            # from persisting on the server for long periods of time.
> +            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
> +            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)
> +            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15)
> +            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
> +
> +        name = self.server.sockets[0].getsockname()
> +        if self.server.sockets[0].family == socket.AF_INET6:
> +            self.address = "ws://[%s]:%d" % (name[0], name[1])
> +        else:
> +            self.address = "ws://%s:%d" % (name[0], name[1])
> +
> +        return [self.server.wait_closed()]
> +
> +    async def stop(self):
> +        self.server.close()
> +
> +    def cleanup(self):
> +        pass
> +
> +    async def client_handler(self, websocket):
> +        socket = WebsocketConnection(websocket, -1)
> +        await self.handler(socket)
> +
> +
>   class AsyncServer(object):
>       def __init__(self, logger):
>           self.logger = logger
> @@ -190,6 +238,9 @@ class AsyncServer(object):
>       def start_unix_server(self, path):
>           self.server = UnixStreamServer(path, self._client_handler, self.logger)
>   
> +    def start_websocket_server(self, host, port):
> +        self.server = WebsocketsServer(host, port, self._client_handler, self.logger)
> +
>       async def _client_handler(self, socket):
>           try:
>               client = self.accept_client(socket)
> diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
> index 3a401835..56b9c6bc 100644
> --- a/lib/hashserv/__init__.py
> +++ b/lib/hashserv/__init__.py
> @@ -9,11 +9,15 @@ import re
>   import sqlite3
>   import itertools
>   import json
> +from urllib.parse import urlparse
>   
>   UNIX_PREFIX = "unix://"
> +WS_PREFIX = "ws://"
> +WSS_PREFIX = "wss://"
>   
>   ADDR_TYPE_UNIX = 0
>   ADDR_TYPE_TCP = 1
> +ADDR_TYPE_WS = 2
>   
>   UNIHASH_TABLE_DEFINITION = (
>       ("method", "TEXT NOT NULL", "UNIQUE"),
> @@ -84,6 +88,8 @@ def setup_database(database, sync=True):
>   def parse_address(addr):
>       if addr.startswith(UNIX_PREFIX):
>           return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
> +    elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
> +        return (ADDR_TYPE_WS, (addr,))
>       else:
>           m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
>           if m is not None:
> @@ -103,6 +109,9 @@ def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
>       (typ, a) = parse_address(addr)
>       if typ == ADDR_TYPE_UNIX:
>           s.start_unix_server(*a)
> +    elif typ == ADDR_TYPE_WS:
> +        url = urlparse(a[0])
> +        s.start_websocket_server(url.hostname, url.port)
>       else:
>           s.start_tcp_server(*a)
>   
> @@ -116,6 +125,8 @@ def create_client(addr):
>       (typ, a) = parse_address(addr)
>       if typ == ADDR_TYPE_UNIX:
>           c.connect_unix(*a)
> +    elif typ == ADDR_TYPE_WS:
> +        c.connect_websocket(*a)
>       else:
>           c.connect_tcp(*a)
>   
> @@ -128,6 +139,8 @@ async def create_async_client(addr):
>       (typ, a) = parse_address(addr)
>       if typ == ADDR_TYPE_UNIX:
>           await c.connect_unix(*a)
> +    elif typ == ADDR_TYPE_WS:
> +        await c.connect_websocket(*a)
>       else:
>           await c.connect_tcp(*a)
>   
> diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
> index 5f7d22ab..9542d72f 100644
> --- a/lib/hashserv/client.py
> +++ b/lib/hashserv/client.py
> @@ -115,6 +115,7 @@ class Client(bb.asyncrpc.Client):
>           super().__init__()
>           self._add_methods(
>               "connect_tcp",
> +            "connect_websocket",
>               "get_unihash",
>               "report_unihash",
>               "report_unihash_equiv",
> diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
> index f343c586..01ffd52c 100644
> --- a/lib/hashserv/tests.py
> +++ b/lib/hashserv/tests.py
> @@ -483,3 +483,20 @@ class TestHashEquivalenceTCPServer(HashEquivalenceTestSetup, HashEquivalenceComm
>           # If IPv6 is enabled, it should be safe to use localhost directly, in general
>           # case it is more reliable to resolve the IP address explicitly.
>           return socket.gethostbyname("localhost") + ":0"
> +
> +
> +class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
> +    def setUp(self):
> +        try:
> +            import websockets
> +        except ImportError as e:
> +            self.skipTest(str(e))
> +
> +        super().setUp()
> +
> +    def get_server_addr(self, server_idx):
> +        # Some hosts cause asyncio module to misbehave, when IPv6 is not enabled.
> +        # If IPv6 is enabled, it should be safe to use localhost directly, in general
> +        # case it is more reliable to resolve the IP address explicitly.
> +        host = socket.gethostbyname("localhost")
> +        return "ws://%s:0" % host
>
> -=-=-=-=-=-=-=-=-=-=-=-
> Links: You receive all messages sent to this group.
> View/Reply Online (#15423): https://lists.openembedded.org/g/bitbake-devel/message/15423
> Mute This Topic: https://lists.openembedded.org/mt/102364905/7851872
> Group Owner: bitbake-devel+owner@lists.openembedded.org
> Unsubscribe: https://lists.openembedded.org/g/bitbake-devel/unsub [develop@schnelte.de]
> -=-=-=-=-=-=-=-=-=-=-=-
>
Joshua Watt Nov. 10, 2023, 2:11 p.m. UTC | #2
On Fri, Nov 10, 2023, 5:03 AM Matthias Schnelte <develop@schnelte.de> wrote:

> Hi Joshua,
>
> thanks for this change! Being able to use websockets instead of some tcp
> connection would help a lot in cooperate setups which are often
> restricted to only http(s) ports and enforce the use of a cooperate proxy.
>
> Unfortunately the websocket library you are using seems not to support
> websockets over http proxy. At least that is what I understood.
>
> Would it be possible to use another client lib for websockets in order
> to support connection through proxy?
>
> This library seems to support it:
>
> https://websocket-client.readthedocs.io/en/latest/examples.html#connecting-through-a-proxy


I'm not sure that's going to work. We need a library that supports asyncio,
and has very minimal dependencies, which the current library satisfies (it
only depends on core Python)


Maybe there is another solution for proxying?



>
> Matthias
>
> On 03.11.23 15:26, Joshua Watt wrote:
> > Adds support to the hash equivalence client and server to communicate
> > over websockets. Since websockets are message orientated instead of
> > stream orientated, and new connection class is needed to handle them.
> >
> > Note that websocket support does require the 3rd party websockets python
> > module be installed on the host, but it should not be required unless
> > websockets are actually being used.
> >
> > Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
> > ---
> >   lib/bb/asyncrpc/client.py     | 11 +++++++-
> >   lib/bb/asyncrpc/connection.py | 44 +++++++++++++++++++++++++++++
> >   lib/bb/asyncrpc/serv.py       | 53 ++++++++++++++++++++++++++++++++++-
> >   lib/hashserv/__init__.py      | 13 +++++++++
> >   lib/hashserv/client.py        |  1 +
> >   lib/hashserv/tests.py         | 17 +++++++++++
> >   6 files changed, 137 insertions(+), 2 deletions(-)
> >
> > diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
> > index 7f33099b..802c07df 100644
> > --- a/lib/bb/asyncrpc/client.py
> > +++ b/lib/bb/asyncrpc/client.py
> > @@ -10,7 +10,7 @@ import json
> >   import os
> >   import socket
> >   import sys
> > -from .connection import StreamConnection, DEFAULT_MAX_CHUNK
> > +from .connection import StreamConnection, WebsocketConnection,
> DEFAULT_MAX_CHUNK
> >   from .exceptions import ConnectionClosedError
> >
> >
> > @@ -47,6 +47,15 @@ class AsyncClient(object):
> >
> >           self._connect_sock = connect_sock
> >
> > +    async def connect_websocket(self, uri):
> > +        import websockets
> > +
> > +        async def connect_sock():
> > +            websocket = await websockets.connect(uri,
> ping_interval=None)
> > +            return WebsocketConnection(websocket, self.timeout)
> > +
> > +        self._connect_sock = connect_sock
> > +
> >       async def setup_connection(self):
> >           # Send headers
> >           await self.socket.send("%s %s" % (self.proto_name,
> self.proto_version))
> > diff --git a/lib/bb/asyncrpc/connection.py
> b/lib/bb/asyncrpc/connection.py
> > index c4fd2475..a10628f7 100644
> > --- a/lib/bb/asyncrpc/connection.py
> > +++ b/lib/bb/asyncrpc/connection.py
> > @@ -93,3 +93,47 @@ class StreamConnection(object):
> >           if self.writer is not None:
> >               self.writer.close()
> >               self.writer = None
> > +
> > +
> > +class WebsocketConnection(object):
> > +    def __init__(self, socket, timeout):
> > +        self.socket = socket
> > +        self.timeout = timeout
> > +
> > +    @property
> > +    def address(self):
> > +        return ":".join(str(s) for s in self.socket.remote_address)
> > +
> > +    async def send_message(self, msg):
> > +        await self.send(json.dumps(msg))
> > +
> > +    async def recv_message(self):
> > +        m = await self.recv()
> > +        return json.loads(m)
> > +
> > +    async def send(self, msg):
> > +        import websockets.exceptions
> > +
> > +        try:
> > +            await self.socket.send(msg)
> > +        except websockets.exceptions.ConnectionClosed:
> > +            raise ConnectionClosedError("Connection closed")
> > +
> > +    async def recv(self):
> > +        import websockets.exceptions
> > +
> > +        try:
> > +            if self.timeout < 0:
> > +                return await self.socket.recv()
> > +
> > +            try:
> > +                return await asyncio.wait_for(self.socket.recv(),
> self.timeout)
> > +            except asyncio.TimeoutError:
> > +                raise ConnectionError("Timed out waiting for data")
> > +        except websockets.exceptions.ConnectionClosed:
> > +            raise ConnectionClosedError("Connection closed")
> > +
> > +    async def close(self):
> > +        if self.socket is not None:
> > +            await self.socket.close()
> > +            self.socket = None
> > diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
> > index 3e0d0632..dfb03773 100644
> > --- a/lib/bb/asyncrpc/serv.py
> > +++ b/lib/bb/asyncrpc/serv.py
> > @@ -12,7 +12,7 @@ import signal
> >   import socket
> >   import sys
> >   import multiprocessing
> > -from .connection import StreamConnection
> > +from .connection import StreamConnection, WebsocketConnection
> >   from .exceptions import ClientError, ServerError, ConnectionClosedError
> >
> >
> > @@ -178,6 +178,54 @@ class UnixStreamServer(StreamServer):
> >           os.unlink(self.path)
> >
> >
> > +class WebsocketsServer(object):
> > +    def __init__(self, host, port, handler, logger):
> > +        self.host = host
> > +        self.port = port
> > +        self.handler = handler
> > +        self.logger = logger
> > +
> > +    def start(self, loop):
> > +        import websockets.server
> > +
> > +        self.server = loop.run_until_complete(
> > +            websockets.server.serve(
> > +                self.client_handler,
> > +                self.host,
> > +                self.port,
> > +                ping_interval=None,
> > +            )
> > +        )
> > +
> > +        for s in self.server.sockets:
> > +            self.logger.debug("Listening on %r" % (s.getsockname(),))
> > +
> > +            # Enable keep alives. This prevents broken client
> connections
> > +            # from persisting on the server for long periods of time.
> > +            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
> > +            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)
> > +            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15)
> > +            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
> > +
> > +        name = self.server.sockets[0].getsockname()
> > +        if self.server.sockets[0].family == socket.AF_INET6:
> > +            self.address = "ws://[%s]:%d" % (name[0], name[1])
> > +        else:
> > +            self.address = "ws://%s:%d" % (name[0], name[1])
> > +
> > +        return [self.server.wait_closed()]
> > +
> > +    async def stop(self):
> > +        self.server.close()
> > +
> > +    def cleanup(self):
> > +        pass
> > +
> > +    async def client_handler(self, websocket):
> > +        socket = WebsocketConnection(websocket, -1)
> > +        await self.handler(socket)
> > +
> > +
> >   class AsyncServer(object):
> >       def __init__(self, logger):
> >           self.logger = logger
> > @@ -190,6 +238,9 @@ class AsyncServer(object):
> >       def start_unix_server(self, path):
> >           self.server = UnixStreamServer(path, self._client_handler,
> self.logger)
> >
> > +    def start_websocket_server(self, host, port):
> > +        self.server = WebsocketsServer(host, port,
> self._client_handler, self.logger)
> > +
> >       async def _client_handler(self, socket):
> >           try:
> >               client = self.accept_client(socket)
> > diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
> > index 3a401835..56b9c6bc 100644
> > --- a/lib/hashserv/__init__.py
> > +++ b/lib/hashserv/__init__.py
> > @@ -9,11 +9,15 @@ import re
> >   import sqlite3
> >   import itertools
> >   import json
> > +from urllib.parse import urlparse
> >
> >   UNIX_PREFIX = "unix://"
> > +WS_PREFIX = "ws://"
> > +WSS_PREFIX = "wss://"
> >
> >   ADDR_TYPE_UNIX = 0
> >   ADDR_TYPE_TCP = 1
> > +ADDR_TYPE_WS = 2
> >
> >   UNIHASH_TABLE_DEFINITION = (
> >       ("method", "TEXT NOT NULL", "UNIQUE"),
> > @@ -84,6 +88,8 @@ def setup_database(database, sync=True):
> >   def parse_address(addr):
> >       if addr.startswith(UNIX_PREFIX):
> >           return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
> > +    elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
> > +        return (ADDR_TYPE_WS, (addr,))
> >       else:
> >           m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
> >           if m is not None:
> > @@ -103,6 +109,9 @@ def create_server(addr, dbname, *, sync=True,
> upstream=None, read_only=False):
> >       (typ, a) = parse_address(addr)
> >       if typ == ADDR_TYPE_UNIX:
> >           s.start_unix_server(*a)
> > +    elif typ == ADDR_TYPE_WS:
> > +        url = urlparse(a[0])
> > +        s.start_websocket_server(url.hostname, url.port)
> >       else:
> >           s.start_tcp_server(*a)
> >
> > @@ -116,6 +125,8 @@ def create_client(addr):
> >       (typ, a) = parse_address(addr)
> >       if typ == ADDR_TYPE_UNIX:
> >           c.connect_unix(*a)
> > +    elif typ == ADDR_TYPE_WS:
> > +        c.connect_websocket(*a)
> >       else:
> >           c.connect_tcp(*a)
> >
> > @@ -128,6 +139,8 @@ async def create_async_client(addr):
> >       (typ, a) = parse_address(addr)
> >       if typ == ADDR_TYPE_UNIX:
> >           await c.connect_unix(*a)
> > +    elif typ == ADDR_TYPE_WS:
> > +        await c.connect_websocket(*a)
> >       else:
> >           await c.connect_tcp(*a)
> >
> > diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
> > index 5f7d22ab..9542d72f 100644
> > --- a/lib/hashserv/client.py
> > +++ b/lib/hashserv/client.py
> > @@ -115,6 +115,7 @@ class Client(bb.asyncrpc.Client):
> >           super().__init__()
> >           self._add_methods(
> >               "connect_tcp",
> > +            "connect_websocket",
> >               "get_unihash",
> >               "report_unihash",
> >               "report_unihash_equiv",
> > diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
> > index f343c586..01ffd52c 100644
> > --- a/lib/hashserv/tests.py
> > +++ b/lib/hashserv/tests.py
> > @@ -483,3 +483,20 @@ class
> TestHashEquivalenceTCPServer(HashEquivalenceTestSetup, HashEquivalenceComm
> >           # If IPv6 is enabled, it should be safe to use localhost
> directly, in general
> >           # case it is more reliable to resolve the IP address
> explicitly.
> >           return socket.gethostbyname("localhost") + ":0"
> > +
> > +
> > +class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup,
> HashEquivalenceCommonTests, unittest.TestCase):
> > +    def setUp(self):
> > +        try:
> > +            import websockets
> > +        except ImportError as e:
> > +            self.skipTest(str(e))
> > +
> > +        super().setUp()
> > +
> > +    def get_server_addr(self, server_idx):
> > +        # Some hosts cause asyncio module to misbehave, when IPv6 is
> not enabled.
> > +        # If IPv6 is enabled, it should be safe to use localhost
> directly, in general
> > +        # case it is more reliable to resolve the IP address explicitly.
> > +        host = socket.gethostbyname("localhost")
> > +        return "ws://%s:0" % host
> >
> > -=-=-=-=-=-=-=-=-=-=-=-
> > Links: You receive all messages sent to this group.
> > View/Reply Online (#15423):
> https://lists.openembedded.org/g/bitbake-devel/message/15423
> > Mute This Topic: https://lists.openembedded.org/mt/102364905/7851872
> > Group Owner: bitbake-devel+owner@lists.openembedded.org
> > Unsubscribe: https://lists.openembedded.org/g/bitbake-devel/unsub [
> develop@schnelte.de]
> > -=-=-=-=-=-=-=-=-=-=-=-
> >
>
Matthias Schnelte Nov. 15, 2023, 7:44 a.m. UTC | #3
Hi Joshua,

what we are using currently is a http tunnel to be able to connect to 
the hashserver which is running on azure infrastructure. The hashserver 
connection in the local.conf is then connecting to the localhost port of 
the httptunnel.

The tunnel is started whenever bitbake is called. This is done by 
sourcing our own environment that replaces the bitbake command with a 
small script that opens the tunnel and then call the original bitbake. I 
just came across  'addhandler' - maybe this would have been a better 
integrated solution.

But this is some project specific plumping - not sure if one could make 
this into a more generic solution.

On 10.11.23 15:11, Joshua Watt wrote:
>
>
> On Fri, Nov 10, 2023, 5:03 AM Matthias Schnelte <develop@schnelte.de> 
> wrote:
>
>     Hi Joshua,
>
>     thanks for this change! Being able to use websockets instead of
>     some tcp
>     connection would help a lot in cooperate setups which are often
>     restricted to only http(s) ports and enforce the use of a
>     cooperate proxy.
>
>     Unfortunately the websocket library you are using seems not to
>     support
>     websockets over http proxy. At least that is what I understood.
>
>     Would it be possible to use another client lib for websockets in
>     order
>     to support connection through proxy?
>
>     This library seems to support it:
>     https://websocket-client.readthedocs.io/en/latest/examples.html#connecting-through-a-proxy
>
>
> I'm not sure that's going to work. We need a library that supports 
> asyncio, and has very minimal dependencies, which the current library 
> satisfies (it only depends on core Python)
>
>
> Maybe there is another solution for proxying?
>
>
>
>
>     Matthias
>
>     On 03.11.23 15:26, Joshua Watt wrote:
>     > Adds support to the hash equivalence client and server to
>     communicate
>     > over websockets. Since websockets are message orientated instead of
>     > stream orientated, and new connection class is needed to handle
>     them.
>     >
>     > Note that websocket support does require the 3rd party
>     websockets python
>     > module be installed on the host, but it should not be required
>     unless
>     > websockets are actually being used.
>     >
>     > Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
>     > ---
>     >   lib/bb/asyncrpc/client.py     | 11 +++++++-
>     >   lib/bb/asyncrpc/connection.py | 44 +++++++++++++++++++++++++++++
>     >   lib/bb/asyncrpc/serv.py       | 53
>     ++++++++++++++++++++++++++++++++++-
>     >   lib/hashserv/__init__.py      | 13 +++++++++
>     >   lib/hashserv/client.py        |  1 +
>     >   lib/hashserv/tests.py         | 17 +++++++++++
>     >   6 files changed, 137 insertions(+), 2 deletions(-)
>     >
>     > diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
>     > index 7f33099b..802c07df 100644
>     > --- a/lib/bb/asyncrpc/client.py
>     > +++ b/lib/bb/asyncrpc/client.py
>     > @@ -10,7 +10,7 @@ import json
>     >   import os
>     >   import socket
>     >   import sys
>     > -from .connection import StreamConnection, DEFAULT_MAX_CHUNK
>     > +from .connection import StreamConnection, WebsocketConnection,
>     DEFAULT_MAX_CHUNK
>     >   from .exceptions import ConnectionClosedError
>     >
>     >
>     > @@ -47,6 +47,15 @@ class AsyncClient(object):
>     >
>     >           self._connect_sock = connect_sock
>     >
>     > +    async def connect_websocket(self, uri):
>     > +        import websockets
>     > +
>     > +        async def connect_sock():
>     > +            websocket = await websockets.connect(uri,
>     ping_interval=None)
>     > +            return WebsocketConnection(websocket, self.timeout)
>     > +
>     > +        self._connect_sock = connect_sock
>     > +
>     >       async def setup_connection(self):
>     >           # Send headers
>     >           await self.socket.send("%s %s" % (self.proto_name,
>     self.proto_version))
>     > diff --git a/lib/bb/asyncrpc/connection.py
>     b/lib/bb/asyncrpc/connection.py
>     > index c4fd2475..a10628f7 100644
>     > --- a/lib/bb/asyncrpc/connection.py
>     > +++ b/lib/bb/asyncrpc/connection.py
>     > @@ -93,3 +93,47 @@ class StreamConnection(object):
>     >           if self.writer is not None:
>     >               self.writer.close()
>     >               self.writer = None
>     > +
>     > +
>     > +class WebsocketConnection(object):
>     > +    def __init__(self, socket, timeout):
>     > +        self.socket = socket
>     > +        self.timeout = timeout
>     > +
>     > +    @property
>     > +    def address(self):
>     > +        return ":".join(str(s) for s in self.socket.remote_address)
>     > +
>     > +    async def send_message(self, msg):
>     > +        await self.send(json.dumps(msg))
>     > +
>     > +    async def recv_message(self):
>     > +        m = await self.recv()
>     > +        return json.loads(m)
>     > +
>     > +    async def send(self, msg):
>     > +        import websockets.exceptions
>     > +
>     > +        try:
>     > +            await self.socket.send(msg)
>     > +        except websockets.exceptions.ConnectionClosed:
>     > +            raise ConnectionClosedError("Connection closed")
>     > +
>     > +    async def recv(self):
>     > +        import websockets.exceptions
>     > +
>     > +        try:
>     > +            if self.timeout < 0:
>     > +                return await self.socket.recv()
>     > +
>     > +            try:
>     > +                return await
>     asyncio.wait_for(self.socket.recv(), self.timeout)
>     > +            except asyncio.TimeoutError:
>     > +                raise ConnectionError("Timed out waiting for data")
>     > +        except websockets.exceptions.ConnectionClosed:
>     > +            raise ConnectionClosedError("Connection closed")
>     > +
>     > +    async def close(self):
>     > +        if self.socket is not None:
>     > +            await self.socket.close()
>     > +            self.socket = None
>     > diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
>     > index 3e0d0632..dfb03773 100644
>     > --- a/lib/bb/asyncrpc/serv.py
>     > +++ b/lib/bb/asyncrpc/serv.py
>     > @@ -12,7 +12,7 @@ import signal
>     >   import socket
>     >   import sys
>     >   import multiprocessing
>     > -from .connection import StreamConnection
>     > +from .connection import StreamConnection, WebsocketConnection
>     >   from .exceptions import ClientError, ServerError,
>     ConnectionClosedError
>     >
>     >
>     > @@ -178,6 +178,54 @@ class UnixStreamServer(StreamServer):
>     >           os.unlink(self.path)
>     >
>     >
>     > +class WebsocketsServer(object):
>     > +    def __init__(self, host, port, handler, logger):
>     > +        self.host = host
>     > +        self.port = port
>     > +        self.handler = handler
>     > +        self.logger = logger
>     > +
>     > +    def start(self, loop):
>     > +        import websockets.server
>     > +
>     > +        self.server = loop.run_until_complete(
>     > +            websockets.server.serve(
>     > +                self.client_handler,
>     > +                self.host,
>     > +                self.port,
>     > +                ping_interval=None,
>     > +            )
>     > +        )
>     > +
>     > +        for s in self.server.sockets:
>     > +            self.logger.debug("Listening on %r" %
>     (s.getsockname(),))
>     > +
>     > +            # Enable keep alives. This prevents broken client
>     connections
>     > +            # from persisting on the server for long periods of
>     time.
>     > +            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
>     > +            s.setsockopt(socket.IPPROTO_TCP,
>     socket.TCP_KEEPIDLE, 30)
>     > +            s.setsockopt(socket.IPPROTO_TCP,
>     socket.TCP_KEEPINTVL, 15)
>     > +            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
>     > +
>     > +        name = self.server.sockets[0].getsockname()
>     > +        if self.server.sockets[0].family == socket.AF_INET6:
>     > +            self.address = "ws://[%s]:%d" % (name[0], name[1])
>     > +        else:
>     > +            self.address = "ws://%s:%d" % (name[0], name[1])
>     > +
>     > +        return [self.server.wait_closed()]
>     > +
>     > +    async def stop(self):
>     > +        self.server.close()
>     > +
>     > +    def cleanup(self):
>     > +        pass
>     > +
>     > +    async def client_handler(self, websocket):
>     > +        socket = WebsocketConnection(websocket, -1)
>     > +        await self.handler(socket)
>     > +
>     > +
>     >   class AsyncServer(object):
>     >       def __init__(self, logger):
>     >           self.logger = logger
>     > @@ -190,6 +238,9 @@ class AsyncServer(object):
>     >       def start_unix_server(self, path):
>     >           self.server = UnixStreamServer(path,
>     self._client_handler, self.logger)
>     >
>     > +    def start_websocket_server(self, host, port):
>     > +        self.server = WebsocketsServer(host, port,
>     self._client_handler, self.logger)
>     > +
>     >       async def _client_handler(self, socket):
>     >           try:
>     >               client = self.accept_client(socket)
>     > diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
>     > index 3a401835..56b9c6bc 100644
>     > --- a/lib/hashserv/__init__.py
>     > +++ b/lib/hashserv/__init__.py
>     > @@ -9,11 +9,15 @@ import re
>     >   import sqlite3
>     >   import itertools
>     >   import json
>     > +from urllib.parse import urlparse
>     >
>     >   UNIX_PREFIX = "unix://"
>     > +WS_PREFIX = "ws://"
>     > +WSS_PREFIX = "wss://"
>     >
>     >   ADDR_TYPE_UNIX = 0
>     >   ADDR_TYPE_TCP = 1
>     > +ADDR_TYPE_WS = 2
>     >
>     >   UNIHASH_TABLE_DEFINITION = (
>     >       ("method", "TEXT NOT NULL", "UNIQUE"),
>     > @@ -84,6 +88,8 @@ def setup_database(database, sync=True):
>     >   def parse_address(addr):
>     >       if addr.startswith(UNIX_PREFIX):
>     >           return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
>     > +    elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
>     > +        return (ADDR_TYPE_WS, (addr,))
>     >       else:
>     >           m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
>     >           if m is not None:
>     > @@ -103,6 +109,9 @@ def create_server(addr, dbname, *,
>     sync=True, upstream=None, read_only=False):
>     >       (typ, a) = parse_address(addr)
>     >       if typ == ADDR_TYPE_UNIX:
>     >           s.start_unix_server(*a)
>     > +    elif typ == ADDR_TYPE_WS:
>     > +        url = urlparse(a[0])
>     > +        s.start_websocket_server(url.hostname, url.port)
>     >       else:
>     >           s.start_tcp_server(*a)
>     >
>     > @@ -116,6 +125,8 @@ def create_client(addr):
>     >       (typ, a) = parse_address(addr)
>     >       if typ == ADDR_TYPE_UNIX:
>     >           c.connect_unix(*a)
>     > +    elif typ == ADDR_TYPE_WS:
>     > +        c.connect_websocket(*a)
>     >       else:
>     >           c.connect_tcp(*a)
>     >
>     > @@ -128,6 +139,8 @@ async def create_async_client(addr):
>     >       (typ, a) = parse_address(addr)
>     >       if typ == ADDR_TYPE_UNIX:
>     >           await c.connect_unix(*a)
>     > +    elif typ == ADDR_TYPE_WS:
>     > +        await c.connect_websocket(*a)
>     >       else:
>     >           await c.connect_tcp(*a)
>     >
>     > diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
>     > index 5f7d22ab..9542d72f 100644
>     > --- a/lib/hashserv/client.py
>     > +++ b/lib/hashserv/client.py
>     > @@ -115,6 +115,7 @@ class Client(bb.asyncrpc.Client):
>     >           super().__init__()
>     >           self._add_methods(
>     >               "connect_tcp",
>     > +            "connect_websocket",
>     >               "get_unihash",
>     >               "report_unihash",
>     >               "report_unihash_equiv",
>     > diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
>     > index f343c586..01ffd52c 100644
>     > --- a/lib/hashserv/tests.py
>     > +++ b/lib/hashserv/tests.py
>     > @@ -483,3 +483,20 @@ class
>     TestHashEquivalenceTCPServer(HashEquivalenceTestSetup,
>     HashEquivalenceComm
>     >           # If IPv6 is enabled, it should be safe to use
>     localhost directly, in general
>     >           # case it is more reliable to resolve the IP address
>     explicitly.
>     >           return socket.gethostbyname("localhost") + ":0"
>     > +
>     > +
>     > +class
>     TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup,
>     HashEquivalenceCommonTests, unittest.TestCase):
>     > +    def setUp(self):
>     > +        try:
>     > +            import websockets
>     > +        except ImportError as e:
>     > +            self.skipTest(str(e))
>     > +
>     > +        super().setUp()
>     > +
>     > +    def get_server_addr(self, server_idx):
>     > +        # Some hosts cause asyncio module to misbehave, when
>     IPv6 is not enabled.
>     > +        # If IPv6 is enabled, it should be safe to use
>     localhost directly, in general
>     > +        # case it is more reliable to resolve the IP address
>     explicitly.
>     > +        host = socket.gethostbyname("localhost")
>     > +        return "ws://%s:0" % host
>     >
>     >
>     >
>
>
> -=-=-=-=-=-=-=-=-=-=-=-
> Links: You receive all messages sent to this group.
> View/Reply Online (#15501):https://lists.openembedded.org/g/bitbake-devel/message/15501
> Mute This Topic:https://lists.openembedded.org/mt/102364905/7851872
> Group Owner:bitbake-devel+owner@lists.openembedded.org
> Unsubscribe:https://lists.openembedded.org/g/bitbake-devel/unsub  [develop@schnelte.de]
> -=-=-=-=-=-=-=-=-=-=-=-
>
diff mbox series

Patch

diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
index 7f33099b..802c07df 100644
--- a/lib/bb/asyncrpc/client.py
+++ b/lib/bb/asyncrpc/client.py
@@ -10,7 +10,7 @@  import json
 import os
 import socket
 import sys
-from .connection import StreamConnection, DEFAULT_MAX_CHUNK
+from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK
 from .exceptions import ConnectionClosedError
 
 
@@ -47,6 +47,15 @@  class AsyncClient(object):
 
         self._connect_sock = connect_sock
 
+    async def connect_websocket(self, uri):
+        import websockets
+
+        async def connect_sock():
+            websocket = await websockets.connect(uri, ping_interval=None)
+            return WebsocketConnection(websocket, self.timeout)
+
+        self._connect_sock = connect_sock
+
     async def setup_connection(self):
         # Send headers
         await self.socket.send("%s %s" % (self.proto_name, self.proto_version))
diff --git a/lib/bb/asyncrpc/connection.py b/lib/bb/asyncrpc/connection.py
index c4fd2475..a10628f7 100644
--- a/lib/bb/asyncrpc/connection.py
+++ b/lib/bb/asyncrpc/connection.py
@@ -93,3 +93,47 @@  class StreamConnection(object):
         if self.writer is not None:
             self.writer.close()
             self.writer = None
+
+
+class WebsocketConnection(object):
+    def __init__(self, socket, timeout):
+        self.socket = socket
+        self.timeout = timeout
+
+    @property
+    def address(self):
+        return ":".join(str(s) for s in self.socket.remote_address)
+
+    async def send_message(self, msg):
+        await self.send(json.dumps(msg))
+
+    async def recv_message(self):
+        m = await self.recv()
+        return json.loads(m)
+
+    async def send(self, msg):
+        import websockets.exceptions
+
+        try:
+            await self.socket.send(msg)
+        except websockets.exceptions.ConnectionClosed:
+            raise ConnectionClosedError("Connection closed")
+
+    async def recv(self):
+        import websockets.exceptions
+
+        try:
+            if self.timeout < 0:
+                return await self.socket.recv()
+
+            try:
+                return await asyncio.wait_for(self.socket.recv(), self.timeout)
+            except asyncio.TimeoutError:
+                raise ConnectionError("Timed out waiting for data")
+        except websockets.exceptions.ConnectionClosed:
+            raise ConnectionClosedError("Connection closed")
+
+    async def close(self):
+        if self.socket is not None:
+            await self.socket.close()
+            self.socket = None
diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index 3e0d0632..dfb03773 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -12,7 +12,7 @@  import signal
 import socket
 import sys
 import multiprocessing
-from .connection import StreamConnection
+from .connection import StreamConnection, WebsocketConnection
 from .exceptions import ClientError, ServerError, ConnectionClosedError
 
 
@@ -178,6 +178,54 @@  class UnixStreamServer(StreamServer):
         os.unlink(self.path)
 
 
+class WebsocketsServer(object):
+    def __init__(self, host, port, handler, logger):
+        self.host = host
+        self.port = port
+        self.handler = handler
+        self.logger = logger
+
+    def start(self, loop):
+        import websockets.server
+
+        self.server = loop.run_until_complete(
+            websockets.server.serve(
+                self.client_handler,
+                self.host,
+                self.port,
+                ping_interval=None,
+            )
+        )
+
+        for s in self.server.sockets:
+            self.logger.debug("Listening on %r" % (s.getsockname(),))
+
+            # Enable keep alives. This prevents broken client connections
+            # from persisting on the server for long periods of time.
+            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
+
+        name = self.server.sockets[0].getsockname()
+        if self.server.sockets[0].family == socket.AF_INET6:
+            self.address = "ws://[%s]:%d" % (name[0], name[1])
+        else:
+            self.address = "ws://%s:%d" % (name[0], name[1])
+
+        return [self.server.wait_closed()]
+
+    async def stop(self):
+        self.server.close()
+
+    def cleanup(self):
+        pass
+
+    async def client_handler(self, websocket):
+        socket = WebsocketConnection(websocket, -1)
+        await self.handler(socket)
+
+
 class AsyncServer(object):
     def __init__(self, logger):
         self.logger = logger
@@ -190,6 +238,9 @@  class AsyncServer(object):
     def start_unix_server(self, path):
         self.server = UnixStreamServer(path, self._client_handler, self.logger)
 
+    def start_websocket_server(self, host, port):
+        self.server = WebsocketsServer(host, port, self._client_handler, self.logger)
+
     async def _client_handler(self, socket):
         try:
             client = self.accept_client(socket)
diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
index 3a401835..56b9c6bc 100644
--- a/lib/hashserv/__init__.py
+++ b/lib/hashserv/__init__.py
@@ -9,11 +9,15 @@  import re
 import sqlite3
 import itertools
 import json
+from urllib.parse import urlparse
 
 UNIX_PREFIX = "unix://"
+WS_PREFIX = "ws://"
+WSS_PREFIX = "wss://"
 
 ADDR_TYPE_UNIX = 0
 ADDR_TYPE_TCP = 1
+ADDR_TYPE_WS = 2
 
 UNIHASH_TABLE_DEFINITION = (
     ("method", "TEXT NOT NULL", "UNIQUE"),
@@ -84,6 +88,8 @@  def setup_database(database, sync=True):
 def parse_address(addr):
     if addr.startswith(UNIX_PREFIX):
         return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
+    elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
+        return (ADDR_TYPE_WS, (addr,))
     else:
         m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
         if m is not None:
@@ -103,6 +109,9 @@  def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
     (typ, a) = parse_address(addr)
     if typ == ADDR_TYPE_UNIX:
         s.start_unix_server(*a)
+    elif typ == ADDR_TYPE_WS:
+        url = urlparse(a[0])
+        s.start_websocket_server(url.hostname, url.port)
     else:
         s.start_tcp_server(*a)
 
@@ -116,6 +125,8 @@  def create_client(addr):
     (typ, a) = parse_address(addr)
     if typ == ADDR_TYPE_UNIX:
         c.connect_unix(*a)
+    elif typ == ADDR_TYPE_WS:
+        c.connect_websocket(*a)
     else:
         c.connect_tcp(*a)
 
@@ -128,6 +139,8 @@  async def create_async_client(addr):
     (typ, a) = parse_address(addr)
     if typ == ADDR_TYPE_UNIX:
         await c.connect_unix(*a)
+    elif typ == ADDR_TYPE_WS:
+        await c.connect_websocket(*a)
     else:
         await c.connect_tcp(*a)
 
diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
index 5f7d22ab..9542d72f 100644
--- a/lib/hashserv/client.py
+++ b/lib/hashserv/client.py
@@ -115,6 +115,7 @@  class Client(bb.asyncrpc.Client):
         super().__init__()
         self._add_methods(
             "connect_tcp",
+            "connect_websocket",
             "get_unihash",
             "report_unihash",
             "report_unihash_equiv",
diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
index f343c586..01ffd52c 100644
--- a/lib/hashserv/tests.py
+++ b/lib/hashserv/tests.py
@@ -483,3 +483,20 @@  class TestHashEquivalenceTCPServer(HashEquivalenceTestSetup, HashEquivalenceComm
         # If IPv6 is enabled, it should be safe to use localhost directly, in general
         # case it is more reliable to resolve the IP address explicitly.
         return socket.gethostbyname("localhost") + ":0"
+
+
+class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
+    def setUp(self):
+        try:
+            import websockets
+        except ImportError as e:
+            self.skipTest(str(e))
+
+        super().setUp()
+
+    def get_server_addr(self, server_idx):
+        # Some hosts cause asyncio module to misbehave, when IPv6 is not enabled.
+        # If IPv6 is enabled, it should be safe to use localhost directly, in general
+        # case it is more reliable to resolve the IP address explicitly.
+        host = socket.gethostbyname("localhost")
+        return "ws://%s:0" % host