LINUX.ORG.RU

Вопрос по непонятому из C++20 короутин

 ,


0

3

Доброго дня!

В очередной раз пытаюсь разобраться с короутинами из C++20. Среди того, что не понимаю, есть один момент, который можно обозначить как «идеология обращения с асинхронными сущностями». Попробую пояснить о чем речь.

Большинство примеров использования короутин демонстрируют co_await в одну операцию. Что-то вроде:

auto socket = co_await socket_factory::connect(ip);
co_await socket.send(request_packet);
auto reply = co_await socket.receive();
...

Но, насколько я смог понять, в реальности за co_await something() стоит два шага:

  1. Вызов something() и получение некого Task-а как результат формирования короутины something (под Task-ом понимается тип возвращаемого something() значения, это может быть и не Task, а какой-нибудь generator или еще что-то, что хранит в себе coroutine_handle из something()).
  2. Применение оператора co_await к возвращенному из something() Task-у. С последующей цепочкой вызовов await_ready/await_suspend/await_resume для Awaiter-а (или самого Task-а, если отдельного Awaiter-а нет).

Поэтому, при желании, программист может переписать строчку co_await socket_factory::connect(ip); в более развернутом виде:

// Получили в свое распоряжение короутину connect.
auto connect_task = socket_factory::connect(ip);
... // Тут курим бамбук.
// И только сейчас толкаем ее на выполнение.
auto socket = co_await connect_task;

Теперь к сути вопроса.

Допустим, у меня есть очередь сообщений и я хочу сделать к ней async_receive, который бы представлял из себя короутину. Т.е. что-то вроде:

async_receive_task_t async_receive(message_queue_t queue) {
  ... // Какие-то действия с co_yield/co_return.
}

И чтобы к этому async_receive можно было применять co_await:

auto msg = co_await async_receive(commands_queue);

Сейчас у меня есть мысль, что в async_receive сразу при входе можно делать попытку чтения из очереди. И если очередь не пуста, то немедленно доставать сообщение оттуда.

Такой «энергичный» подход вроде как хорош тем, что не нужно тратить время если очередь сообщений не пуста. Сразу же внутри async_receive получаем сообщение, наш awaiter_t об этом узнает и вернет true из await_ready. Соответственно, не придется делать попытку приостановить ту короутину, в которой обратились к async_receive.

Однако, меня смущает потенциальная возможность вот какого сценария:

auto receive_coro = async_receive(command_queue); // (1)
// Теперь у нас на руках есть короутина async_receive, но в приостановленном
// состоянии.
...
... // Еще какие-то действия.
...
if(something_went_wrong) return; // Дальше не идем.

// Только сейчас нам нужно сообщение из очереди.
use(co_await receive_coro); // (2)

При «энергичном» подходе если в command_queue было сообщение, то это сообщение будет извлечено в точке (1). Однако, если дело до точки (2) не дойдет, то сообщение будет потеряно.

Что мне кажется неправильным. Ведь co_await-а не было. А значит и явной попытки взять сообщение в обработку не было.

Возможно, правильным был бы «ленивый» подход. Сама короутина async_receive создавалась бы в приостановленном состоянии, а первая попытка чтения из очереди была бы в awaiter_t::await_suspend. И чтобы awaiter_t::await_suspend возвращал true только если очередь пуста и нужно приостанавливаться до появления в ней сообщений (соответственно, если в awaiter_t::await_suspend сообщение удалось взять, то возвращается false).

Чтобы было еще более понятно, допустим, что есть синхронный аналог, sync_receive, который всегда возвращает взятое из очереди сообщение (и блокируется если очередь пуста). В случае синхронного кода можно написать:

auto msg = sync_receive(queue);
log(debug, "msg extracted, type={}", msg->type());
...
if(something_went_wrong) return;
...
log(debug, "start processing msg");
use(msg);
log(debug, "complete processing msg");

И по логам мы точно отследим когда сообщение было взято из очереди. Даже если оно не было обработано из-за раннего return-а, то судьба сообщения становится понятна.

В случае же с async_receive написать в таком же стиле не получится. Т.е.:

auto receive_coro = async_receive(queue); // (1)
...
if(something_went_wrong) return;
...
auto msg = co_await receive_coro;
log(debug, "msg extracted, type={}", msg->type()); // (3)
log(debug, "start processing msg");
use(msg);
log(debug, "complete processing msg");

Если сообщение реально было изъято из очереди в точке (1), но до точки (3) мы не дошли, то следов сообщения у нас не будет. Получится, что иногда сообщения теряются, но непонятно где и как.

Собственно весь мой вопрос – это попытка понять, как в мире современного С++ принято относится к коду вида:

auto task = some_call(); // (1)
...
auto result = co_await task; // (2)

Я вижу два возможных и вполне себе логично обоснованных варианта:

  1. В точке (1) некая операция инициируется и выполняется параллельно пока основной код идет из (1) в (2). В точке (2) мы либо забираем уже готовый результат операции, начатой в (1), либо приостанавливаемся, если результата еще не было.
  2. В точке (1) мы только создаем ресурсы для некой операции, но сама операция еще не начата. Операция начинается в точке (2) и, если сразу ее результат получить не получится, то нас приостановят пока этот самый результат не готов.

И нужно выбрать какой из этих вариантов взять за основу для async_receive чтобы это вызывало как можно меньше «WTF?»

★★★★★
auto receive_coro = async_receive(command_queue); // (1)

// Теперь у нас на руках есть короутина async_receive, но в приостановленном

// состоянии.

...

... // Еще какие-то действия.

...

if(something_went_wrong) return; // Дальше не идем.

// Только сейчас нам нужно сообщение из очереди.

use(co_await receive_coro); // (2)

При «энергичном» подходе если в command_queue было сообщение, то это сообщение будет извлечено в точке (1). Однако, если дело до точки (2) не дойдет, то сообщение будет потеряно.

Может я чего-то не понимаю, но async_receive должен только конструировать объект Awaitable, запомнить ссылку на очередь, но ничего с ней не делать.

Вся движуха начнётся при co_await receive_coro. Awaitable::await_ready сходит по ссылке в очередь и проверит есть ли там элемент. Если элемента нет, то Awaitable::await_suspend подпишет корутину на событие о появлении элемента в очереди. Awaitable::await_resume - только здесь из очереди будет извлечён элемент.

Таким образом до вызова co_await ничего с очередью не происходит. Ты просто создашь корутину, а если она не понадобилась, то выкинешь.

ox55ff ★★★★★
()
Ответ на: комментарий от ox55ff

Может я чего-то не понимаю, но async_receive должен только конструировать объект Awaitable, запомнить ссылку на очередь, но ничего с ней не делать.

Спасибо за ваше мнение.

Чем больше думаю, тем больше сам склоняюсь к этому варианту.

eao197 ★★★★★
() автор топика

самые нормальные кораутины это в boost::asio всё остальное задротство, в том числе и стандартные c++20.

anonymous2 ★★★★★
()

ты co_await делаешь не на таске, а на awaitable, это вообще может быть не корутина; если awatable в await_ready вернет true, то никакого await вообще не случится

Reset ★★★★★
()
Последнее исправление: Reset (всего исправлений: 1)
Ответ на: комментарий от anonymous2

за использование блоатваре полагается расстрел!

Reset ★★★★★
()

я кстати так и не понял что именно не понятно?

Reset ★★★★★
()
Ответ на: комментарий от Reset

ты co_await делаешь не на таске, а на awaitable, это вообще может быть не корутина;

если awatable в await_ready вернет true, то никакого await вообще не случится

Я в курсе. Но вопрос не в этом.

я кстати так и не понял что именно не понятно?

Для гипотетического async_receive попытку взять сообщение из очереди нужно делать внутри вызова async_receive или внутри co_await (await_ready/await_suspend)?

eao197 ★★★★★
() автор топика

Поэтому, при желании, программист может переписать строчку co_await socket_factory::connect(ip); в более развернутом виде:

нет, нельзя

на операторе co_await, будет точка разрыва фрейма

а в вашем примере вы рисуете доп функцию которая будет до точки разрыва фрейма это не одно и тоже

вот кстати есть такой сайт если не знали https://cppinsights.io/s/5ad71aed

Допустим, у меня есть очередь сообщений и я хочу сделать к ней async_receive, который бы представлял из себя короутину. Т.е. что-то вроде:

смысл async_receive вернуть сообщение из очереди а не вернуть сообщение или пустое сообщение в случае пустого сообщение она спит и ожидает ивента о том что будет сообщение, обработать и вернуть сообщение на выходе co_await

В случае же с async_receive написать в таком же стиле не получится. Т.е.:

Если сообщение реально было изъято из очереди в точке (1), но до точки (3) мы не дошли, то следов сообщения у нас не будет. Получится, что иногда сообщения теряются, но непонятно где и как.

я выше объяснил

Собственно весь мой вопрос – это попытка понять, как в мире современного С++ принято относится к коду вида:

я изначально сказал что это два разных кода которые вы приводите

причем последний, это ужас ужас

ps очень сложно написано, я много раз перечитывал пытаясь понять что же вы спрашиваете

поэтому ответил как понял

voipdev
()
Ответ на: комментарий от voipdev

нет, нельзя

Позволю себе усомниться.

на операторе co_await, будет точка разрыва фрейма

И?

вот кстати есть такой сайт если не знали https://cppinsights.io/s/5ad71aed

Все тоже самое с подробными объяснениями описано, например, вот здесь.

смысл async_receive вернуть сообщение из очереди а не вернуть сообщение или пустое сообщение в случае пустого сообщение она спит и ожидает ивента о том что будет сообщение, обработать и вернуть сообщение на выходе co_await

async_receive можно сделать, как минимум, двумя способами:

Первый. Входим в async_receive, возвращаем из promise_type::initial_suspend значение suspend_always, ничего с очередью не делаем. Получаем короутину async_receive в приостановленном состоянии. Все действия с очередью делаем уже в Awaiter-е после того, как к возвращенному из async_receive объекту применили co_await (что выше уже описал тов.0x55ff).

Второй. Входим в async_receive, возвращаем из promise_type::initial_suspend значение suspend_never, пытаемся сделать неблокирующее чтение из очереди. Если очередь пуста, то делаем co_yield специального значения empty_queue, в этом случае короутина async_receive приостанавливается для повторного входа в нее. Если же очередь была не пуста, то делаем co_return msg. Когда к возвращенному из async_receive значению применяют co_await, то объект Awaiter смотрит, что произошло в короутине. Если был co_yield empty_queue, то Awaiter::await_ready возвращает false, а Awaiter::await_suspend предпринимает действия, чтобы текущую короутину разбудили когда очередь станет не пустой. Если же бы co_return msg, то Awaiter::await_ready возвращает true, а до Awaiter::await_suspend дело вообще не доходит.

Только вот если делать async_receive вторым способом, то возможна потеря взятого из очереди сообщения, если программист напишет вот так:

auto receive_op = async_receive(queue);
...
if(random) return;
...
use(co_await receive_op);

Собственно вопрос в том, насколько много WTF? будет вызывать второй способ реализации async_receive у пользователей.

eao197 ★★★★★
() автор топика
Ответ на: комментарий от eao197

теперь кажется понял

все зависит от того КАК вы реализуете функцию async_receive и ее возвращаемый авейтор

потому как даже в приведенном вами псевдокоде auto receive_op = async_receive(queue);

она может не делать ничего, кроме возвращение авейтора

и некий мифический if(random) return никак не повлияет на очередь

потому что все состоятся только в use(co_await receive_op)

или реализовать в функции и забор сообщения, а потом уже возвращение авейтора

но тогда нужно делать RAII на receive_op что бы при мифическом if(random) return

сообщение закинуть обратно в очередь (т.е. в деструкторе receive_op)

а иначе да, оно будет утеряно

но возвращать авейтор в переменной рассчитывая на то что пользователь сам далее сделает co_await на переменной по моему мнения очень НЕ приятно в использовании

зы нюансы с забором сообщения по итогу, что бы в деструкторе оно опять не вернулось по кругу, я опущу, это само собой разумеется

voipdev
()
Последнее исправление: voipdev (всего исправлений: 1)
Ответ на: комментарий от voipdev

все зависит от того КАК вы реализуете функцию async_receive и ее возвращаемый авейтор

Ну так вот у меня вопрос в том как реализовать async_receive, чтобы это вызывало минимум удивления у пользователей.

Возвращать сообщение обратно в очередь нельзя. Терять в общем случае тоже (т.е. где-то это не будет проблемой, где-то будет).

она может не делать ничего, кроме возвращение авейтора

Формально говоря, async_receive возвращает не Awaitor, а некий объект, который превращается в Awaiter посредством co_await. И в качестве Awaiter может быть как сам этот объект, так и что-то, что этим объектом порождается в operator co_await.

eao197 ★★★★★
() автор топика
Ответ на: комментарий от eao197

не реализовывать забор сообщения из очереди в теле функции async_receive

тогда ее вызов с результатом в переменную, не будет ничего забирать из очереди

а в ее теле можете написать лямбду, в которой будет некий основной или доп механизм забора очереди, и которая будет так же захватывать аргументы функции async_receive

лямбда в свою очередь будет передаваться через конструктор авейтора(буду так называть)

который и будет возвращать ваша функция

при этом кажется, а зачем тогда вообще нужна функция

но я так понимаю вы хотите именно функцию, для того что бы на откуп пользователю предоставить некий апи, скрыв реализацию

voipdev
()
Ответ на: комментарий от eao197

Формально говоря, async_receive возвращает не Awaitor, а некий объект, который превращается в Awaiter посредством co_await.

Я правильно понимаю что планируется порождать неконтролируемо-сложные (отдано на откуп авторам компилятора) и в принципе слабо-контролируемые сущности на каждое сообщение?

bugfixer ★★★★★
()
Ответ на: комментарий от bugfixer

Я правильно понимаю что планируется порождать неконтролируемо-сложные (отдано на откуп авторам компилятора) и в принципе слабо-контролируемые сущности на каждое сообщение?

Да.

eao197 ★★★★★
() автор топика
Ответ на: комментарий от eao197

Я правильно понимаю что планируется порождать неконтролируемо-сложные (отдано на откуп авторам компилятора) и в принципе слабо-контролируемые сущности на каждое сообщение?

Да.

Крайне сомнительная идея, как по мне - мой внутренний «барометр рисков» просто зашкаливает.

bugfixer ★★★★★
()
Ответ на: комментарий от bugfixer

Крайне сомнительная идея, как по мне - мой внутренний «барометр рисков» просто зашкаливает.

Если у меня просят нож с серрейтором, то почему бы не дать? А будут ли им резать хлеб или сырое мясо – это уже не мои проблемы ;)

Если серьезно, то я смотрю на это дело так: в мире C++ появился еще один прием для программирования – асинхронные задачи на базе безстековых короутин. Благо это дело поддерживается компилятором. Для каких-то задач это удобно (ну или кому-то кажется удобным). А раз так, то данный прием будет находить все большее и большее применение. А раз так, то нужно его поддержать.

Что касается накладных расходов, то по моему скромному опыту есть два важных фактора:

  • во-первых, не во всех задачах передача сообщений является узким местом. Иногда очереди сообщений используются для высокоуровневой координации работы, при этом на саму эту координацию приходится доли процента, а все остальное – это сама работа (например, перелопачивания большого объема данных). В таких задачах накладные расходы на десяток-другой короутин в секунду просто ни о чем. А вот если co_await позволяет запрограммировать это дело проще, быстрее (по времени разработки) и с меньшим количеством усилий со стороны программиста, так это же хорошо;

  • во-вторых, я придерживаюсь мнения, что если где-то требуется темп передачи свыше 1000 msg/s между сущностями, то либо что-то делается не так и от обмена сообщениями вообще нужно уходить (например, в сторону того, чтобы все обрабатывалось прямо по месту), либо же должны применяться специализированные, заточенные под конкретную предметную область и условия инструменты (типа нашумевшего в свое время Disruptor-а). В обоих случаях разрабатываемая нами библиотека не нужна и значит мне можно об этом не беспокоится, там люди работают покруче меня и сами знают что им нужно.

Так что да, для задач, где микросекунда – это целая вечность, мой async_receive не подойдет. Но он для таких задач и не предназначается.

eao197 ★★★★★
() автор топика
Ответ на: комментарий от eao197

Если у меня просят нож с серрейтором, то почему бы не дать?

Я не против экспериментов. Но отсутствие «clear path out» на случай если что-то пойдёт не так - реально пугает. Подстелите соломки хотя бы, в виде какого-нибудь batching’а изначально заложенного в архитектуру.

bugfixer ★★★★★
()

В точке (1) мы только создаем ресурсы для некой операции, но сама операция еще не начата. Операция начинается в точке (2) и, если сразу ее результат получить не получится, то нас приостановят пока этот самый результат не готов

Это классический паттерн «отложенное вычисление»

В точке (1) некая операция инициируется и выполняется параллельно пока основной код идет из (1) в (2). В точке (2) мы либо забираем уже готовый результат операции, начатой в (1), либо приостанавливаемся, если результата еще не было.

А это не менее классический «параллельное вычисление»

И оба паттерна классические и давно стали общепринятой практикой.

ПыСы - в случае парллельного вычисления обычно еще возвращают CancelRoutine который можно (нужно) дергать если «ой я передумал» - но даже тогда задача обработки накопленного стейта остается на вызвавшем Cancel. Если разработчик использовал реализованный по паттерну «параллельное вычисление» режим и просрал наработанный стейт (полученное в фоне сообщение) - это его заднепроходные проблемы. Он запросил данные - он получил данные - он обязан обработать полученное. То что он оказался не готов ибо поспешил запросить - его личная головная боль.

no-dashi-v2 ★★★★
()
Ответ на: комментарий от no-dashi-v2

Если разработчик использовал реализованный по паттерну «параллельное вычисление» режим и просрал наработанный стейт (полученное в фоне сообщение) - это его заднепроходные проблемы.

Так-то оно так, но запрос сообщения – это все-таки не «параллельное вычисление» и профукать сообщение с командой сделать что-то не то же самое, что отказаться забирать результат, скажем, ресайзинга картинки.

Поэтому я здесь специально акцентирую внимание на операции async_receive.

eao197 ★★★★★
() автор топика
Для того чтобы оставить комментарий войдите или зарегистрируйтесь.