0
0 комментариев

По совету @andreymal из моего предыдущего вопроса:

Если разными частями приложения являются разные процессы, то для этого надо организовывать межпроцессное взаимодействие, чтобы один процесс работал с соединениями, а другие процессы отправляли этому процессу сообщения. Можно для этого написать свой велосипед на сокетах, можно для этого использовать готовые решения вроде RabbitMQ и Redis.

Пытаюсь реализовать межпроцессное взаимодействие при помощи celery.

Исходные данные и какую задачу нужно решить:
К серверу по вебсокетам должны подключаться клиенты (люди, использующие браузер) и ожидать сообщений от сервера.
На сервере время от времени срабатывает процесс, генерирующий это самое сообщение. Это сообщение (которое нужно отдать клиенту) можно попросить у процесса передать в указанный скрипт.

Что для этого сделано:

  • Обработчик сокет-соединений на Python tornado;
  • .py скрипт, которому на stdin подается сообщение от процесса, который генерирует само сообщение;
  • функция-таск для celery, которая должна отправить сообщение в браузер.

То есть, процесс генерирует сообщение -> вызывает скрипт, передавая ему сообщение на stdin -> скрипт вызывает celery таск -> таск берет массив соединений и рассылает им сообщение.

Суть проблемы:
Сокет сервер складывает все соединения в массив, но этот массив оказывается пустым для celery таска.

Собственно, вопрос: как завести один массив на несколько процессов, или где я налажал, и как это делать правильно?

UPD добавил упрощенный код для понимания того, как и что есть.
Сокет-сервер на tornado:

class SocketHandler(tornado.websocket.WebSocketHandler):
    connections = []
 
    def open(self):
        self.__class__.connections.append(self)
 
    @classmethod
    def get_connections(cls):
        return cls.connections

и celery таск, который должен отправлять сообщения клиентам:

from handlers import SocketHandler
 
@celery.task
def send_msg(msg):
    for conn in SocketHandler.get_connections():
        conn.write_message(msg)

Но список полученный путем SocketHandler.get_connections() в файле с celery тасками оказывается пустым.
Запускаю все это в разных терминалах так:

(venv)$ python app.py  # это торнадо апп, импортирующий приведенный handlers
(venv)$ celery -A tasks worker


Добавить комментарий