LINUX.ORG.RU

Треды, прерывание


0

1

Посоветуйте паттерн. Суть:

Управляющий поток (обсчитывает полученные данные)
|
+-- Рабочий процесс 1 (сетевой сокет)
|   |
|   +-- Вспомогательный процесс (таймер-пинговалка)
|
+-- Рабочий процесс 2 (сетевой сокет)
    |
    +-- Вспомогательный процесс (таймер-пинговалка)

В любом из потомков управляющего потока может возникнуть исключение. Например, рабочие процессы получили с двух серверов данные, управляющий поток их забрал и вычисляет разницу для последующей синхронизации. В этот момент просыпается одна из пинговалок и обнаруживает, что удаленный сервер закрыл сетевой сокет. В этот момент надо прервать выполнение всего приложения (позакрывать еще открытые сокеты, потушить рабочие потоки и вспомогательные потоки, после чего погасить управляющий поток и выйти, распечатав прилетевшее исключение.

Как All делает прерывание выполнения потока команд в управляющем потоке, если, предположим, он только что вошел в функцию о сотне строк и в данный момент находится на первой из них (до последней доползет через часик, но данные не актуальны уже сейчас - соединения накрылись).

Городить лапшу из однострочных методов, главной фишкой которых будет проверить очередь ошибок перед собственно выполнением задачи не хотелось бы.

Посоветуйте паттерн

Я советую тебе больше не писать программы. Никогда

anonymous
()

Я слабо уловил суть проблемы, и дурацкая диаграмма мне ни о чем не говорит.

Но, что-то мне подсказыват, что ты банально не осилил RAII

anonymous
()

Какой язык то? Можно и сигнал бросить, а в хендлере все позакрывать и умереть.

x0r ★★★★★
()

В этот момент просыпается одна из пинговалок и обнаруживает, что удаленный сервер закрыл сетевой сокет

Какой-то дебильный способ проверять жизнеспособность сокета, если честно. Ты точно знаешь про select/poll/epoll/kqueue/.../asio?

anonymous
()

Как All делает прерывание выполнения потока команд в управляющем потоке, если, предположим, он только что вошел в функцию о сотне строк и в данный момент находится на первой из них (до последней доползет через часик, но данные не актуальны уже сейчас - соединения накрылись).

Что-то мне подсказывает, что у тебя какой-то архитектурный косяк. Если главный поток только и делает, что ждет, пока к нему не придут данные на обработку от дочерних потоков (кстати, каким образом? локальный сокет, очередь?), то почему бы просто не разнообразить команды от рабочих потоков управляющему новым типом? Словил вместо данных такой пакет, завершился, профит.

anonymous
()

Это легко делается на F#. Там есть монада асинхронных вычислений Async (на продолжениях). Так вот, там отмена вычислений есть «из коробки».

Посылается сигнал отмены, и как только вычисление в любом месте, а вычисление может быть распараллелено, доходит до очередного Bind (он же, известен как >>=), соответствующая ветка вычислений будет прервана. И так для всего вычисления.

Вопрос только в том, подходит ли тебе F# :)

dave ★★★★★
()
Ответ на: комментарий от anonymous

Какой-то дебильный способ проверять жизнеспособность сокета, если честно. Ты точно знаешь про select/poll/epoll/kqueue/.../asio?

heartbeat? Нормальный способ. Если пакеты, разрывающие соединение, пролюбились, селект скажет о том, что соедиение накрылось где-то через 2 часа

AptGet ★★★
()
Ответ на: комментарий от dave

Что важно, все блоки finally отрабатываются при такой экстренной отмене вычисления. Поэтому ресурсы освобождаются правильно. И все это при минимуме программирования и даже понимания того, как это все внутри работает :)

dave ★★★★★
()
Ответ на: комментарий от AptGet

heartbeat? Нормальный способ. Если пакеты, разрывающие соединение, пролюбились, селект скажет о том, что соедиение накрылось где-то через 2 часа

Как они пролюбятся, если tcp ганартирует доставку?

В случае udp - да, heartbeat-ы с watchdog-ами решают

anonymous
()
Ответ на: комментарий от anonymous

Как они пролюбятся, если tcp ганартирует доставку?

tcp не гарантирует, что с той стороны сервер внезнапно не ушел в kernel panic, мыши не погрызли оптику, бомжи не сдали на стекло трансатлантический кабель. Так что да, в локалочке и на обратной петле - select() ok, а в больших интернетах лучше на гарантированность не надеяться

AptGet ★★★
()
Ответ на: комментарий от anonymous

Чувак, ты открыл мне глаза!

Я знал! Скажи мужикам, которые пихают в свои протоколы heartbeat, что он там не нужен.

AptGet ★★★
()
Ответ на: комментарий от AptGet

Пытаюсь вспомнить, сколько я ждал выхода ssh после разрыва соединения с инетом... Причём этот ssh по ctrl-c не прерывался. Есть там в параметрах heartbeat (например раз в минуту)?

gag ★★★★★
()
Ответ на: комментарий от gag

Есть там в параметрах heartbeat (например раз в минуту)?

Да, см ServerAliveInterval, ServerAliveCountMax в конфиге

AptGet ★★★
()

Приоткрою немного завесу тайны: синхронизируются IMAP-ящики. Смотрел на треды в питоне и жабе. В обоих метод stop() у классов Thread выпилили, хотя он тоже не был серебряной пулей.

Наш весельчак в маске может спать спокойно: для него я ничего писать не собирался, а на досуге может просто слегка посмотреть на например imaplib.IMAP4, в котором исключения любят броать по любому чиху. Более того, сервера IMAP тоже любят взбрыкивать по любому поводу, и исключительная ситуация, описанная в первом посте - это «последняя капля», таких ситуаций может быть дохрена: сервер может внезапно решить, что войти в «папку» я не могу, либо она внезапно стала read-only, либо внезапно у сообщения на сервере-источнике образовался флаг, не поддерживаемый сервером-назначением, можно сколько угодно продолжать. Любая из перечисленных ситуаций должна приводить к полной остановке работы программы (один запуск синхронизялки - один аккаунт).

И да, я знаю про верблюжий imapsync, но, кроме огромных аппетитов к оперативе, он более ничем не примечателен, не смотря на обилие опций.

GateKeeper ★★
() автор топика

В идеале бы поиметь некий способ проинжектить исключение из одного потока в другой, но получить рабочий код на питоне 2.7.2 с использованием cpython не получилось (постоянно ругается на неправильный thread id).

GateKeeper ★★
() автор топика
Ответ на: комментарий от GateKeeper

Ну так лови исключение в рабочем потоке, маршалль его в какую-нибудь внутренюю команду и шли родительскому потоку, в чём проблема-то?

anonymous
()
Ответ на: комментарий от GateKeeper

В обоих метод stop() у классов Thread выпилили, хотя он тоже не был серебряной пулей.

зато в gevent оно есть.

true_admin ★★★★★
()

Лично я разделил сетевой уровень и приложения. В результате, в случае если сетевой уровень обнаружил проблему, он или вызывает коллбэк или дёргает событие или кладёт новое событие в очередь. А там уж задача «треда» делать или не делать по этому поводу.

Вообще, без goto грустно делать «откаты».

true_admin ★★★★★
()

Да уж, тред просто переполняется революционными идеями. Особенно понравилось желание ТС плодить аж по два (!) потока на сокет. Что так мало-то?

Если по делу - используйте select/epoll/etc. Не напрямую, а через twisted, естественно. Насколько я помню, там и IMAP реализован.

shylent
()
Ответ на: комментарий от shylent

Ок. Следал select. Еще даже пять раз успел сделать. Но хитрожопый IMAP-сервер сказал BYE и обрубил соединение принудительно, т.к. я, мудак с точки зрения сервера, сижу на сокете и ничего не делаю. Рассказывай свою историю успеха уже, как ты заставил epoll/select/etc'ом IMAP-сервер сжалиться над тобой и подождать твоих августейших команд еще немного.

GateKeeper ★★
() автор топика
Ответ на: комментарий от GateKeeper

Вы бы хоть кусок кода показали или, там, ссылку на код дали. А то, пардон, сплошные эмоции и иносказания, а по делу - ничего, в общем-то. Как тут можно что-то посоветовать?

И потоки с процессами путаете.

shylent
()
Ответ на: комментарий от shylent

Кусок какого кода? Я ж спрашиваю, как и где возможна реализация.

Вот тут проверял:

def raise_async(tid, exctype):
  res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  print("Result: {0}, target: {1}".format(res, tid))

class T(threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
  def setContext(self, ctx):
    self.context = ctx
  def raiseExc(self, exctype):
    raise_async(self.ident, Exception)
  def run(self):
    print("Child ident: {0}".format(self.ident))
    time.sleep(15)
    self.context.raiseExc(Exception)

class M(threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
    self.child = T()
  def run(self):
    print("Main ident: {0}".format(self.ident))
    self.child.setContext(self)
    self.child.start()
    time.sleep(20)
  def raiseExc(self, exctype):
    raise_async(self.ident, Exception)

def check():
  m = M()
  m.start()
  c = 1
  while m.isAlive() or m.child.isAlive():
    ms = "alive" if m.isAlive() else "dead"
    ts = "alive" if m.child.isAlive() else "dead"
    print("{:>2}|m: {}, t: {}".format(c, ms, ts))
    time.sleep(1)
    c += 1
Судя по глобальной и надежной, дочерний поток должен воткнуть исключение в родительский поток и прервать его выполнение.

GateKeeper ★★
() автор топика
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.