Помощь в исправлении ошибки asyncpg

30 000 руб. за проект
17 мая 2023, 13:27 • 10 откликов • 149 просмотров
Ссылка на код: https://pastebin.com/qtQyhAmn (вырван из контекста, код запускается в боте на aiogram)
Python 3.11

Этот код реализует параллельное выполнение задач в Python, используя мультипроцессинг и многопоточность. Сначала создается пул процессов, каждый из которых имеет свой собственный пул потоков и свой собственный событийный цикл asyncio. Затем задачи могут быть отправлены на выполнение в эти процессы, и они будут выполняться асинхронно.

Вот подробнее, что происходит в каждом из основных блоков кода:

  1. _proc_initializer: Это функция инициализации для каждого процесса в пуле процессов. Она создает новый событийный цикл asyncio, запускает его в отдельном потоке и создает пул потоков для выполнения задач.
  2. _proc_executor: Это пул процессов. При создании каждый процесс инициализируется с помощью функции _proc_initializer.
  3. _run_in_thread: Эта функция принимает корутину и запускает ее в потоке из пула потоков текущего процесса.
  4. _run_coro: Эта функция создает обертку вокруг корутины, которая будет выполнена в процессе. Эта обертка передает сигнал через канал перед выполнением корутины.
  5. _run_in_process: Эта функция запускает корутину в процессе, добавляет обратный вызов для обработки результата и возвращает конец канала для прослушивания результата.
  6. submit: Эта функция принимает корутину и аргументы, запускает ее в процессе и возвращает результат. Если произошла ошибка, она будет поднята здесь. В этом методе также используется ThreadPoolExecutor для ожидания результата выполнения задачи.
Итак, в целом, этот код создает сложную структуру параллельного выполнения, которая может эффективно использовать ресурсы машины для выполнения большого количества задач.

Код рабочий, я его использовал для одного бота, там все работает без нареканий (но иногда ошибка возникала), но затем я интегрировал этот код в другого бота, на моей локальной машине тоже все работает без нареканий, но на сервере появляется ошибка, которая ниже, причем если перезагружать бота много раз, ошибка может пропасть и все будет нормально работать, хочется стабильного решения, чтобы я не перезагружал бота по много раз, чтобы это решение начало работать, мне нужно избавиться от этой ошибки, желательно оставить ваши контакты для связи, в приоритете telegram

Traceback (most recent call last):

File "/bot/app/misc/db.py", line 391, in get_active_tracks_ebay
return await con.fetch(sql)
│ │ └ '\n SELECT * FROM running_trackers_ebay;\n '
│ └ <function Connection.fetch at 0x7f72f2830180>
└ <PoolConnectionProxy [released] 0x7f72f1ef3940>

File "/usr/local/lib/python3.11/site-packages/asyncpg/connection.py", line 620, in fetch
return await self._execute(
│ └ <function Connection._execute at 0x7f72f2825580>
└ <asyncpg.connection.Connection object at 0x7f72f1e83680>
File "/usr/local/lib/python3.11/site-packages/asyncpg/connection.py", line 1658, in _execute
result, _ = await self.__execute(
└ <asyncpg.connection.Connection object at 0x7f72f1e83680>
File "/usr/local/lib/python3.11/site-packages/asyncpg/connection.py", line 1683, in __execute
return await self._do_execute(
│ └ <function Connection._do_execute at 0x7f72f2825760>
└ <asyncpg.connection.Connection object at 0x7f72f1e83680>
File "/usr/local/lib/python3.11/site-packages/asyncpg/connection.py", line 1710, in _do_execute
stmt = await self._get_statement(
│ └ <function Connection._get_statement at 0x7f72f2813880>
└ <asyncpg.connection.Connection object at 0x7f72f1e83680>
File "/usr/local/lib/python3.11/site-packages/asyncpg/connection.py", line 397, in _get_statement
statement = await self._protocol.prepare(
│ └ <member '_protocol' of 'Connection' objects>
└ <asyncpg.connection.Connection object at 0x7f72f1e83680>
File "asyncpg/protocol/protocol.pyx", line 168, in prepare
return await waiter

RuntimeError: Task <Task pending name='Task-59' coro=<_run_coro.<locals>.wrapper() running at /bot/app/worker/process_pool.py:42> cb=[_chain_future.<locals>._call_set_state() at /usr/local/lib/python3.11/asyncio/futures.py:394]> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

File "/usr/local/lib/python3.11/threading.py", line 995, in _bootstrap
self._bootstrap_inner()
│ └ <function Thread._bootstrap_inner at 0x7f72f499af20>
└ <Thread(Thread-1 (run_forever), started daemon 140131636160256)>
File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
self.run()
│ └ <function Thread.run at 0x7f72f499ac00>
└ <Thread(Thread-1 (run_forever), started daemon 140131636160256)>
File "/usr/local/lib/python3.11/threading.py", line 975, in run
self._target(*self._args, **self._kwargs)
│ │ │ │ │ └ {}
│ │ │ │ └ <Thread(Thread-1 (run_forever), started daemon 140131636160256)>
│ │ │ └ ()
│ │ └ <Thread(Thread-1 (run_forever), started daemon 140131636160256)>
│ └ <built-in method run_forever of Loop object at 0x558f18be4130>
└ <Thread(Thread-1 (run_forever), started daemon 140131636160256)>

> File "/bot/app/worker/process_pool.py", line 60, in task_done_callback
task["result"] = ft.result()
│ │ └ <method 'result' of '_asyncio.Future' objects>
│ └ <Future finished exception=InterfaceError('cannot perform operation: another operation is in progress')>
└ {'task_id': '<function _run_coro.<locals>.wrapper at 0x7f72f1e96160>', 'exception': InterfaceError('cannot perform operation:...

File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
│ │ └ None
│ └ None
└ None

File "/bot/app/worker/process_pool.py", line 36, in _run_in_thread
return ft.result()
│ └ <function Future.result at 0x7f72f49c4220>
└ <Future at 0x7f72f1ee3350 state=finished raised InterfaceError>

File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 456, in result
return self.__get_result()
└ None
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
└ None

File "/bot/app/worker/process_pool.py", line 42, in wrapper
return await coro()
└ functools.partial(<function run_check_tracker at 0x7f72f25251c0>)

File "/bot/app/misc/check_old_trackers.py", line 29, in run_check_tracker
active_tracks = await db.get_active_tracks_ebay()
│ └ <function Database.get_active_tracks_ebay at 0x7f72f28340e0>
└ <app.misc.db.Database object at 0x7f72f1fb67d0>

File "/bot/app/misc/db.py", line 390, in get_active_tracks_ebay
async with self.pool.acquire() as con:
│ │ │ └ <PoolConnectionProxy [released] 0x7f72f1ef3940>
│ │ └ <function Pool.acquire at 0x7f72f2832200>
│ └ <asyncpg.pool.Pool object at 0x7f72f1efc6c0>
└ <app.misc.db.Database object at 0x7f72f1fb67d0>

File "/usr/local/lib/python3.11/site-packages/asyncpg/pool.py", line 220, in release
raise ex
File "/usr/local/lib/python3.11/site-packages/asyncpg/pool.py", line 210, in release
await self._con.reset(timeout=budget)
│ │ └ None
│ └ <member '_con' of 'PoolConnectionHolder' objects>
└ <asyncpg.pool.PoolConnectionHolder object at 0x7f72f20026c0>
File "/usr/local/lib/python3.11/site-packages/asyncpg/connection.py", line 1366, in reset
await self.execute(reset_query, timeout=timeout)
│ │ │ └ None
│ │ └ 'SELECT pg_advisory_unlock_all();\nCLOSE ALL;\nUNLISTEN *;\nRESET ALL;'
│ └ <function Connection.execute at 0x7f72f2813740>
└ <asyncpg.connection.Connection object at 0x7f72f1e83680>
File "/usr/local/lib/python3.11/site-packages/asyncpg/connection.py", line 317, in execute
return await self._protocol.query(query, timeout)
│ │ │ └ None
│ │ └ 'SELECT pg_advisory_unlock_all();\nCLOSE ALL;\nUNLISTEN *;\nRESET ALL;'
│ └ <member '_protocol' of 'Connection' objects>
└ <asyncpg.connection.Connection object at 0x7f72f1e83680>
File "asyncpg/protocol/protocol.pyx", line 323, in query
self._check_state()
File "asyncpg/protocol/protocol.pyx", line 707, in asyncpg.protocol.protocol.BaseProtocol._check_state
raise apg_exc.InterfaceError(
│ └ <class 'asyncpg.exceptions._base.InterfaceError'>
└ <module 'asyncpg.exceptions' from '/usr/local/lib/python3.11/site-packages/asyncpg/exceptions/__init__.py'>

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
Task exception was never retrieved
future: <Task finished name='Task-76' coro=<submit() done, defined at /bot/app/worker/process_pool.py:75> exception=InterfaceError('cannot perform operation: another operation is in progress')>
Traceback (most recent call last):
File "/bot/app/worker/process_pool.py", line 83, in submit
raise task["exception"]
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress