Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error asyncio.exceptions.InvalidStateError: invalid state on connection_lost #36

Open
golubovai opened this issue Aug 19, 2024 · 5 comments

Comments

@golubovai
Copy link

Get error on production server:

Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 69, in uvloop.loop.Handle._run
File "uvloop/handles/basetransport.pyx", line 169, in uvloop.loop.UVBaseTransport._call_connection_lost
File "asynctnt/iproto/coreproto.pyx", line 185, in asynctnt.iproto.protocol.CoreProtocol.connection_lost
File "asynctnt/iproto/protocol.pyx", line 413, in asynctnt.iproto.protocol.BaseProtocol._on_connection_lost
File "/usr/local/lib/python3.12/site-packages/asynctnt/connection.py", line 174, in connection_lost
self._disconnect_waiter.set_result(True)
asyncio.exceptions.InvalidStateError: invalid state

After this no connections could be established.

version 2.2.1
python 3.12

Maybe this change will be sufficient:

if self._disconnect_waiter: -> if self._disconnect_waiter and not self._disconnect_waiter.done():

@igorcoding
Copy link
Owner

Hi! If possible could you provide some more info on how it could be reproduced? Maybe some code snippets on how you setup connection or general connection lifecycle in your app? I understand that it could be hard to reproduce, but still may be some clues.

@golubovai
Copy link
Author

Hi! I'll try.

We use async connection pool (asyncio_connection_pool) with asynctnt to reuse connections and speed up interaction.

Connection pool strategy looks like this:

from asyncio_connection_pool import ConnectionStrategy
from asynctnt import Connection

class ConnectionPoolStrategy(ConnectionStrategy[Connection]):
    def __init__(
        self,
        connection_config: ConnectionConfig,
        connection_pool_config: ConnectionPoolConfig,
    ):
        self._connection_config = connection_config
        self._connection_pool_config = connection_pool_config

    async def make_connection(self):
        return await Connection(
            host=self._connection_config.host,
            port=self._connection_config.port,
            username=self._connection_config.username,
            password=self._connection_config.password,
            fetch_schema=False,
            auto_refetch_schema=False,
            connect_timeout=self._connection_pool_config.connect_timeout,
            reconnect_timeout=self._connection_pool_config.reconnect_timeout,
            ping_timeout=self._connection_pool_config.ping_timeout,
        ).connect()

    def connection_is_closed(self, conn):
        return not conn.is_connected

    async def close_connection(self, conn):
        await conn.disconnect()

asyncio_connection_pool don't have an option for timeout on get_connection function.
Therefore we use wrapper for async context manager to add this functionality:

@asynccontextmanager
    async def get_connection(self):
        mgr = (self._connection_pool.get_connection())
        aexit = type(mgr).__aexit__
        aenter = type(mgr).__aenter__
        timeout = self.get_connection_pool_config().connect_timeout
        try:
            async with async_timeout(timeout):
                conn = await aenter(mgr)
            try:
                yield conn
            except Exception:
                if not await aexit(mgr, *sys.exc_info()):
                    raise
            else:
                await aexit(mgr, None, None, None)
        except (TimeoutError, asyncio.TimeoutError):
            raise TimeoutError(f'Connection timeout: {str(timeout)}')

Then this wrapper is used in standart way:

async with self.get_connection() as conn:
     pass

@golubovai
Copy link
Author

Configuration values are:
connect_timeout = 0.5
reconnect_timeout = None
ping_timeout = 0

@golubovai
Copy link
Author

golubovai commented Aug 20, 2024

Found this place in source Connection.py:
except asyncio.TimeoutError: # pragma: nocover
tr.close()
continue # try again

What if while tr.close() (docs say that it works asynchronously) we already create new protocol and call disconnect on it.

@igorcoding
Copy link
Owner

Hi @golubovai ! Sorry for the delay - got carried away with other stuff. I tried to reproduce the error - still no luck :(
Instead I've conducted several "thought experiments" and maybe (maybe) the problem could've been in the not so transparent state logic. Could you maybe try to replace connection_is_closed method with the following:

    def connection_is_closed(self, conn: asynctnt.Connection):
        return conn.state == ConnectionState.DISCONNECTED

This conn.state seems much more accurate to what the end-user (you) expect for the connection to be connected or disconnected.
So if errors still happen - please try to replace this function and please let me know how it goes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants