LINUX.ORG.RU

IPC, алгоритм синхронизации


1

2

Привет.
Есть 2 буфера, которые будут лежать в shared memory, в один из них будет производиться запись из 2-х процессов. Как только этот буфер заполнится, он отсылается по сети, в это время используется второй буфер. Timestamp'a в данных нет, порядок записи имеет значение.
Собственно, как это правильно синхронизировать, при условии, что мьютексов и семафоров нет. Есть только сообщения, которые процессы могут посылать друг другу.

Я пока надумал 2 возможных решения:
1. Посылать сообщения как механизм синхронизации. Решение простое, но кажется, что неэффективное, потому как сообщений будет чуть более, чем докуя.
2. Каждому процессу писать в свой буфер, а при заполнении синхронизировать (опять же сообщениями). Но тогда встаёт вопрос, как лучше синхронизировать, если timestamp'a нету. Его добавление влечёт собой 4(?) дополнительных байта.

Если у кого-нибудь есть дельные мысли, прошу поделиться. Спасибо.

★★★★★

Последнее исправление: UVV (всего исправлений: 1)

пусть пишет тока один процесс в буфер, работая как овнер над буфером, а второй ему шлёт то что надо записать

Vernat ★★
()
Ответ на: комментарий от Vernat

Ну, собственно, вариант, спасибо. Владелец, так и так, будет один, тот, который память выделил. Однако этот вариант схож с моим первым и не отменяет много сообщений...

UVV ★★★★★
() автор топика
Ответ на: комментарий от UVV

я думаю во всех других вариантах ты огребёшь отлаживаться, и всеравно будут ошибки, либо надо много курить по этой теме, наверняка не ты первый кто с этим сталкивается.

Vernat ★★
()

1. RabbitMQ, ActiveMQ, etc
2. timestamp это лишь частный случай task index. Измеряться может как битах, так и байтах, таких как захочешь.

gh0stwizard ★★★★★
()

Может лучше схема с токенами и множеством буферов.Токен - это индекс буфера размером с байт. Тот кто владеет токеном, тот может работать с буфером. Байт можно передавать через pipe другим процессам. Изначально всеми токенами владеет мастер-процесс, он раздает рабочим процессам токены, те заполняют буфера и возвращают токены назад, возвращенные токены отдаются отдельному процессу, который пишет в сеть.

Хотя хз, что у тебя там за задача. Может не нужны тебе, ни два процесса, ни shared memory.

pathfinder ★★★★
()

Странные какие-то ограничения. Разделяемая память и сообщения есть, а мютексов нет? Лочь файл, чо.

tailgunner ★★★★★
()
Ответ на: комментарий от gh0stwizard

RabbitMQ, ActiveMQ

Не позорься. Эти слоники могут только лениво махать хвостиком. Ладно б еще ZMQ, но эти вообще мимо.

anonymous
()
Ответ на: комментарий от anonymous

но эти вообще мимо

Че мимо-то? Для определенных задач подходят. Я же не телепат и etc не зря поставил :)

gh0stwizard ★★★★★
()
Ответ на: комментарий от anonymous

Эти слоники могут только лениво махать хвостиком.

А конкретные примеры будут? Я знаю кучку проектов, в которых как activemq, так и rabbitmq прекрасно работают.

tazhate ★★★★★
()
Ответ на: комментарий от tailgunner

Странные какие-то ограничения. Разделяемая память и сообщения есть, а мютексов нет? Лочь файл, чо.

Файлов нет, ничего нет =)
Есть разделяемая память и сообщения (не POSIX).

UVV ★★★★★
() автор топика

Сделать блокировку на атомиках? либо lockless буффер. Я как раз сейчас этим развлекаюсь

vromanov ★★★
()
Ответ на: комментарий от pathfinder

Задача скидывать логи по сети. Только один процесс имеет доступ к сетевой карте. 2 процесса уже есть и от меня это не зависит =)

UVV ★★★★★
() автор топика
Ответ на: комментарий от UVV

Какая-то ублюдочная недоRTOS? Тогда делай передачу токена, как выше посоветовали. Обмен сообщениями в таких системах должен быть быстрым.

tailgunner ★★★★★
()
Ответ на: комментарий от UVV

Header

#ifndef LL_BUFF_H
#define	LL_BUFF_H
#include "rx_atomic.h"
#include "rx_common.h"
#include "ll_bitset.h"


#ifdef	__cplusplus
extern "C" {
#endif

#define LL_BUFFER_FREE 0
#define LL_BUFFER_WRITE 1
#define LL_BUFFER_FULL 2

    typedef uint16_t ll_record_data_size_t;

    typedef struct {
        rx_atomic_int32_t state;
        int32_t size;
        rx_atomic_int32_t used_bytes;
        rx_atomic_int32_t lock_count;
        rx_atomic_int32_t msg_count;
        uint8_t data[];
    } ll_buffer_t;

    typedef struct {
        ll_record_data_size_t data_size; //This include only data size
        int64_t timestamp;
        uint8_t data[0];
    } __attribute__((__packed__)) ll_record_t;

#define LL_RECORD_HEADER_SIZE (sizeof(ll_record_t)+sizeof(ll_record_data_size_t))
#define LL_RECORD_MAX_SIZE (0xffff - LL_RECORD_HEADER_SIZE)
#define LL_BUFFER_CHUNK_MAX_COUNT 10

    typedef struct {
        uint32_t count;

        struct {
            const void* data;
            ll_record_data_size_t size;
        } chunks[LL_BUFFER_CHUNK_MAX_COUNT];
    } ll_buffer_chunks_t;

    typedef enum {
        ll_alloc_ok,
        ll_alloc_no_space,
        ll_alloc_error,
    } ll_alloc_result_t;

    ll_alloc_result_t ll_buffer_reserve(ll_buffer_t* b, int32_t data_size, void** data);
    ll_alloc_result_t ll_buffer_submit(ll_buffer_t* b);
    ll_alloc_result_t ll_buffer_push_ex(ll_buffer_t *b, const ll_buffer_chunks_t* chunks, int64_t timestamp);
    ll_alloc_result_t ll_buffer_push(ll_buffer_t *b, const void* data, ll_record_data_size_t size);


#ifdef	__cplusplus
}
#endif

#endif	/* LL_BUFF_H */


vromanov ★★★
()
Ответ на: Header от vromanov
#include "ll_buff.h"
#include "rx_utils.h"
#include "rx_shm_buffer.h"

ll_alloc_result_t ll_buffer_reserve(ll_buffer_t* b, int32_t data_size, void** data) {
    *data = NULL;
    __sync_add_and_fetch(&b->lock_count, 1);
    while (1) {
        if (b->state != LL_BUFFER_WRITE) {
            __sync_sub_and_fetch(&b->lock_count, 1);
            return ll_alloc_no_space;
        }
        int32_t used = b->used_bytes;
        if (used + data_size > b->size) {
            __sync_bool_compare_and_swap(&b->state, LL_BUFFER_WRITE, LL_BUFFER_FULL);
            __sync_sub_and_fetch(&b->lock_count, 1);
            return ll_alloc_no_space;
        }
        if (__sync_bool_compare_and_swap(&b->used_bytes, used, used + data_size)) {
            *data = b->data + used;
            return ll_alloc_ok;
        }
    }
}

ll_alloc_result_t ll_buffer_submit(ll_buffer_t* b) {
    __sync_add_and_fetch(&b->msg_count, 1);
    __sync_sub_and_fetch(&b->lock_count, 1);
    return ll_alloc_ok;
}

ll_alloc_result_t ll_buffer_push(ll_buffer_t *b, const void* data, ll_record_data_size_t size) {
    ll_buffer_chunks_t chunks = {1,
        {
            {data, size}
        }};
    return ll_buffer_push_ex(b, &chunks, 0);
}

ll_alloc_result_t ll_buffer_push_ex(ll_buffer_t *b, const ll_buffer_chunks_t* chunks, int64_t timestamp) {
    uint32_t i, size = 0;
    for (i = 0; i < chunks->count; ++i) {
        size += chunks->chunks[i].size;
    }
    if (size == 0 || size > LL_RECORD_MAX_SIZE) {
        return ll_alloc_ok;
    }

    ll_record_t* record;
    ll_alloc_result_t res = ll_buffer_reserve(b, LL_RECORD_HEADER_SIZE + size, (void**) &record);
    if (res != ll_alloc_ok) {
        return res;
    }
    record->data_size = size;
    record->timestamp = (timestamp <= 0) ? getTimeMicroseconds() : timestamp;
    uint8_t* data_pos = record->data;
    for (i = 0; i < chunks->count; ++i) {
        memmove(data_pos, chunks->chunks[i].data, chunks->chunks[i].size);
        data_pos += chunks->chunks[i].size;
    }
    *((ll_record_data_size_t*) (data_pos)) = size;
    return ll_buffer_submit(b);
}
vromanov ★★★
()
Ответ на: комментарий от vromanov

Это некоторый оверкилл.. Тут сделана закладка, чтобы по сообщениям в буфере можно было итерироваться вперед/назад. Можно обойтись ll_buffer_reserve и ll_buffer_submit

vromanov ★★★
()
Ответ на: комментарий от UVV

Файлов нет, ничего нет =)
Есть разделяемая память и сообщения (не POSIX).

linux

Эм. На ноль не дели, ага?

Используй мютексы и не парь мозг.

anonymous
()
Ответ на: комментарий от tazhate

А конкретные примеры будут?

С персистентностью они еле ворочаются. 50k mps довольно таки мало для определенного круга задач. Легче тупо в логи скидывать, а потом подбирать.

anonymous
()
Ответ на: комментарий от tailgunner

Млин, не получится это. Могу только сказать, что процессы не имеют доступа ни к каким общим ресурсам, кроме памяти, и имеют доступ к проприетарному memory management api.

UVV ★★★★★
() автор топика
Ответ на: комментарий от anonymous

Эм. На ноль не дели, ага?

Ок, у меня signal handler установлен на этот случай.

UVV ★★★★★
() автор топика
Ответ на: комментарий от UVV

Могу только сказать, что процессы не имеют доступа ни к каким общим ресурсам, кроме памяти, и имеют доступ к проприетарному memory management api.

...и какому-то IPC. Вот по этому IPC и гоняй байт токена.

tailgunner ★★★★★
()
Ответ на: комментарий от UVV

Это общая память и есть. Т.е. ll_buffer_t должен лежать в разделемой памяти. Атомики на мипсе есть. Атомик это способ поменять переменную в общей памяти, так чтобы другие потоки, процессора не могли ее одновременно поменять. Погугли по словам mips atomic. Вот, например, http://sourceware.org/ml/libc-ports/2012-06/msg00028.html

vromanov ★★★
()
Ответ на: комментарий от tailgunner

Ок, буду смотреть.

UVV ★★★★★
() автор топика
Ответ на: комментарий от vromanov

Тоже спасибо, завтра почитаю.

UVV ★★★★★
() автор топика
Ответ на: комментарий от UVV

Млин, не получится это. Могу только сказать, что процессы не имеют доступа ни к каким общим ресурсам, кроме памяти, и имеют доступ к проприетарному memory management api.

Реализуй собственные мутексы. Всё необходимое у тебя есть: общая память и атомарные инструкции процессора. Как примитивный спинлок пишется, знаешь?

anonymous
()

Один из процессов сделать главным, а второй пусть сообщениями передаёт данные (ну не гигобайты же у тебя)

upd: глянул - вроде 100 байт можно передать.

ziemin ★★
()
Последнее исправление: ziemin (всего исправлений: 1)

Lockless queue, 2 процесса пишут в неё, кто-нибудь один читает очередь и пишет в буфер.

mv ★★★★★
()
Ответ на: комментарий от anonymous

50k mps довольно таки мало для определенного круга задач

Я как бы про этот круг задач и спрашиваю. Мы же не знаем, какая задача у ТС.

tazhate ★★★★★
()
Ответ на: комментарий от tazhate

Мы же не знаем, какая задача у ТС.

Зато мы знаем какая у него железяка. И AMQP (а тем более брокер на этой железяке) будет явным оверкилом.

anonymous
()
Ответ на: комментарий от tazhate

Я как бы про этот круг задач и спрашиваю.

Любая объемная статистика с толстым потоком. Брокер сообщений здесь как бы напрашивается, но, по факту, лишнее звено.

anonymous
()
Ответ на: комментарий от anonymous

ZMQ

+1 к ZMQ, там именно что IPC поддерживается, правда есть ограничения на ОС (если не ошибаюсь), но в Linux есть 100%

I-Love-Microsoft ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.