LINUX.ORG.RU

Учебная задача: оптимизировать работу с очередью

 


0

4

Учебная задача. Симулятор обработки сетевых пакетов. Дан буфер размера size и набор Npac пар чисел Ti-Di. В моменты Ti приходят пакеты, на обработку которых нужно Di времени. Di может быть 0. Несколько Ti подряд могут совпадать, но они гарантированно не убывают. Пришедший пакет добавляется в хвост буфера. Обрабатывается только пакет в голове, на 0-м месте, остальные ждут. Если подряд идут несколько пакетов длительностью 0, они обрабатываются в одном такте, в том же, когда первый из них дошёл до головы, и этот же такт засчитывается и в обработку следующего за ними ненулевого пакета. Когда буфер заполнен, приходящие пакеты сбрасываются.

Требуется выдать массив времён, когда началась обработка каждого пакета. Или «-1», если пакет был сброшен.

Я сделал предельно просто — счётчик тактов времени, на каждом такте пробегается вначале очередь, всем ждущим снижается время ожидания, если что-то доходит до головы — обрабатывается, затем пробегается массив пакетов от первого необработанного до первого со временем прибытия Ti больше текущего. Работает правильно, но долго.

Попробовал ускорить, двигать время не по 1 такту, а определять дельты до прихода ближайшего пакета или окончания обработки головы очереди, что раньше. Выигрыш получился несущественный. Оптимизации с заменой list на collections.deque и хранением длин массивов в отдельных переменных дали прирост 10-20%

Почитал советы — говорят, надо избавиться от цикла со счётчиком времени и работать с событиями. Какая принципиальная разница с дельтами? Попробовал, запутался в алгоритме, постоянно что-то теряется. Получилось гораздо быстрее, но возможно, потому, что неправильно.

Вариант с дельтами:

import time, sys, collections
ar = list(map(int, sys.stdin.read().split()))
size, npac = ar[0:2]
packets = [ar[lcv*2+2:lcv*2+4] for lcv in range(npac)]
packets = [{'arrival':arrival, 'duration':duration, 'start':-1, 'time':duration} for arrival, duration in packets]
maxtime = 1 + sum(p['arrival'] + p['duration'] for p in packets)
queue = collections.deque()
curp = 0 # номер первого необработанного пакета
t = 0
nextarrival = 0
nextqueue = 0
delta = 1

while t < maxtime:
    # обработать очередь
    if queue:
        queue[0]['time'] -= 1
        if queue[0]['time'] <= 0:
            queue.popleft()
            while queue and queue[0]['time'] <= 0:
                queue[0]['start'] = t
                queue.popleft()
            if queue:
                queue[0]['start'] = t
                nextqueue = queue[0]['time']
    # проверить прибытие пакетов
    for i in range(curp, npac):
        p = packets[i]
        if t < p['arrival']:
            nextarrival = p['arrival']
            break
        curp = i+1
        if t == p['arrival']:
            if not queue:
                    p['start'] = t
                    if p['duration'] > 0:
                        queue.append(p)
            elif len(queue) < size:
                    queue.append(p)
            else:
                p['start'] = -1
    # выход, всё закончилось
    if len(queue) < 2 and curp >= npac:
        break
    # время следующего события
    tlast = t
    t = max(min(nextarrival, nextqueue), t+1)
    delta = t - tlast

for p in packets:
    print(p['start'])

Что и как здесь можно ускорить?

Или как делать иначе? Цикл по массиву пакетов, где в ожидании очередного прибытия прокручивается очередь, у меня получился слишком запутанным. Другие варианты есть?

★★★★★

на каждом такте пробегается вначале очередь, всем ждущим снижается время ожидания,

О_о. Зачем? !!

У тебя Ti неубывает.

Пакеты упорядочены в порядке их обработки.

Обрабатывай голову и все последующие до первого ненулевого времени (с учетом времени обработки). Время храни и увеличивая.

soomrack ★★★★★
()
Ответ на: комментарий от question4

Псевдокод, который демонстрирует основную мысль (но в который много чего надо добавить, чтобы задачу правильно решал).

current_time = 0;

for(packet in queue) {
    if(current_time < packet_time) {
        current_time = packet.Ti;
    }
    packet.start_time = current_time;
    current_time += packet.Di;
}

for(packet in queue) {
    print(packet.start_time);
}
soomrack ★★★★★
()
Последнее исправление: soomrack (всего исправлений: 1)

Если питон, то попробуй SimPy.

А вообще, это отдельная большая область. Приведу разные названия, но они обо одном и том же: «дискретно-событийное моделирование», «системы массового обслуживания», «сети очередей».

Странно. Я давно наблюдаю за твоими постами. Думал, что ты все знаешь, а тут пытаешься переизобрести колесо


Вообще, у меня есть и свои собственные инструменты на эту тему. Только я пока не совсем понял, правильно ли я понял твою задачу. Слишком краткое описание, а на длинное я не готов. По крайней мере, не сейчас вечером

anonymous
()
Ответ на: комментарий от question4

Какое время?

Текущее эмулируемое

Смотришь два события:

  • очередной пакет пришёл
  • очередной пакет из очереди обработан

смотришь, что раньше наступит

  1. пришёл пакет

очередь полностью забита? отбрасываешь

если нет, добавляешь в очередь

если очередь была пустая, текущее время заменяешь на время прибытия пакета

  1. пакет обработан

увеличиваешь текущее время на время обработки этого пакета

удаляешь его из очереди

да, если обработчик сразу забирает пакет из очереди (ещё в процессе обработки), можно считать как очередь длины N+1

router ★★★★★
()
Последнее исправление: router (всего исправлений: 2)

Код не читал, но на вскидку вырисовывается что для выдачи ответа не стоит заводить никаких коллекций кроме исходного list пакетов и выходного list ответов

list ответов изначально инициализируем всеми None; он в дополнение к нескольким числовым переменным будет являться носителем состояния алгоритма.

Числовые переменные, описывающие состояние следующие:

  • такт времени (инициализируется T₀)
  • индекс пакетв во входном массиве пакетов, который начал обрабатываться в этот такт времени (инициализируется нулём)
  • степень занятости очереди в момент начала обработки пакета выше (инициализируется единицей)
  • индекс первого пакета про который мы ещё не знаем успел он попасть или был отброшен, так как время его прихода позже текущего такта (инициализируется единицей)

И далее пишем достаточно замороченный по коду но суммарно линейный по количеству пакетов (то бишь быстрый) алгоритм перехода такта времени от текущего к текущему + Dᵢ смотрим сколько пришло пакетов за период Dᵢ соответственно сдвигая актуальный размер очереди, индекс необработанных и сразу помечая те которые не влезли в массиве ответов -1 После этого сдвигаем индекс обрабатываемого, глядя на массив ответов - там где -1 то было отброшено, в очереди его нет. А там где None - то в очереди, надо обрабатывать

по идее с таким подходом в цикле не будет никакиз реаллокаций коллекций, а сложность будет линейной.

Грубо говоря внешний цикл итерирует по событиям начала и конца обработок, п внутренний по событиям прихода за период обработок. Но несмотря на вложенность циклов - суммарно по всем итерациям внешнего - внутренний сделает тоже коливество итераций что Nₚ и алгоритм будет линейным

GPFault ★★★
()

как делать иначе?

Было бы неплохо разбить логику на более высокоуровневые элементарные операции и собрать решение из них, а то в бульоне из низкоуровневых действий трудно что-то понять.

Nervous ★★★★★
()
Последнее исправление: Nervous (всего исправлений: 1)
Ответ на: комментарий от question4

Утро вечера мудреннее. Если я правильно понял твою задачу, то ее можно решить прямо вот так, как ты хочешь, используя изыки C#/F# и Haskell. Для C#/F# это пока только у меня на компьютере, а вот на Haskell можно сделать прямо сейчас (и бесплатно), используя библиотеку aivika. Там можно сделать все, что ты написал в своем коде на питоне, но придется программировать.

Однако я сейчас покажу, как можно сформулировать твою задачу на высокоуровневом языке VisualAivika. Это визуальная IDE, и нам придется пока втиснуться в существующие ограничения языка, но наградой будет то, что такую модель легко расширять. Также там легко строить графики и выводить статистику. Например, можно посмотреть график отклонений для трендов и доверительных интервалов по методу 3-сигм, можно вывести сводную статистику и гистограммы. Можно и провести анализ чувствительности к внешним параметрам.

Ниже будут приведены показатели, если для которых вывести экстремумы, то там четко будет видно, что мы не выходим за пределы ёмкости ресурса, который обрабатывает твои пакеты.

Проблема в твоем коде в том, что ты пытаешься использовать так называемый метод имитации, «управляемой временем». Концептуально это самый простой метод, но как ты сам заметил, у него есть изъяны в производительности, да и, честно говоря, ты замучаешься реализовывать что-то с более сложным поведением.

Итак, у нас есть поток входящих пакетов. В VisualAivika придется определить как поток внешних событий, где задержка между приходом событий подчиняется некоторому случайному распределению. Пусть задержка будет иметь целочисленное равномерное распределение от 0 до 5.

// это случайный поток пакетов
// (здесь распределение равномерное целочисленное от 0 до 5)
Packets = Stream.randomInt(0, 5);

Мы считаем, что при обработке пакетов есть предельная ёмкость очереди, а также введем параметр, который задаст максимальное время обработки пакетов.

// ёмкость очереди
Size = 5;

// максимальная длительность обработки
MaxDuration = 7;

Для блокировки обработки пакетов нам понадобится ресурс, который может обрабатывать не более одного пакета за раз.

// это ограниченный ресурс (прибор)
Resource = Facility.create();

Для ограничения ёмкости будем накапливать статистику по пакетам, находящимся в очереди ресурса на обработку. Для самоконтроля дополнительно еще будем учитывать количество пакетов в очереди (для иллюстрации идеи). Первые два момента могут совпадать с ресурсом, но верхний экстремум будет на 1 больше (из-за блокировки).

// для самоконтроля - сколько пакетов в очереди
AvailableQueue = Queue.create();

Нам, конечно, интересно количество отказов. Заведем статистику и для этого.

// чтобы посмотреть статистику по отказам
FailingQueue = Queue.create();

Теперь определим обработку пакетов (заявок, транзактов - кто как называет). Входящие заявки будут поступать на блок, который будет решать, а не превышена ли ёмкость ресурса? В зависимости от этого пакеты будут перенаправлены на соответствующие блоки.

// Здесь и далее в IDE переносы каретки придется заменить на пробелы,
// а знаки точки с запятой (в конце) вводить не нужно

// сюда поступают пакеты на обработку
Chain1Init =
  Block.select(if Facility.queueLength(Resource) < Size
               then Chain2Proceed
               else Chain3Fail);

Если ёмкость ресурса позволяет, то ставим пакет в очередь на обработку, где возможна блокировка всей цепочки, пока имитируем активность для обработки пакета.

// ёмкость ресурса позволяет обработать пакеты
Chain2Proceed =
  Block.queue(AvailableQueue) >>>
  Block.seize(Resource) >>>
  Block.depart(AvailableQueue) >>>
  Block.advance(randomInt(0, MaxDuration)) >>>    // это задержка (пусть равномерная)
  Block.release(Resource) >>>
  Block.terminate;

(продолжение следует)

anonymous
()
Ответ на: комментарий от question4

Иначе, если ёмкость ресурса заполнена, то считаем, что пакет теряется, но статистику все равно собираем, чтобы потом посмотреть ее на графиках, например, на графике отклонения (тренд + доверительные интервалы).

// просто собираем статистику по отказам в обработке пакетов
Chain3Fail =
  Block.queue(FailingQueue) >>>
  Block.depart(FailingQueue) >>>
  Block.terminate;

Теперь у нас есть все, чтобы запустить имитацию модели.

// запускаем всю модель
Runner = do! Block.runByStream(Packets, Chain1Init);

Настало время для вывода графиков и таблиц. Чтобы в VisualAivika посмотреть их, сначала нужно сформировать показатели. Показатели сами дискретно-событийные, но при переходе на графики часть из них немного огрубляется, но зато это позволяет использовать их в тех же системах обыкновенных дифференциальных уравнений (системная динамика).

// статистика (можно строить графики и таблицы)

// общее количество отказов
FailCount = Queue.enqueueCount(FailingQueue);

// общее количество пакетов, посланных на обработку
ProcessingCount = Queue.enqueueCount(AvailableQueue);

// время ожидания в очереди
WaitTime = Facility.waitTime(Resource);

// дискретизированная длина очереди (для ресурса ограниченной ёмкости)
Len = Facility.queueLength(Resource);

// фактическая статистика по длине очереди (для ресурса ограниченной ёмкости)
LenStats = Facility.queueLengthStats(Resource);

Экстремумы для Len и LenStats должны быть в пределах установленной ёмкости. Это можно увидеть, если вывести сводную статистику.

Тем не менее, для самоконтроля еще выведем дополнительные показатели.

// статистика для самоконтроля

// дискретизированный ограничитель очереди в моменте (верхний экстремум может случайно оказаться 1 + Size)
AvailableQueueLen = Queue.content(AvailableQueue);

// совокупная статистика по ограничителю очереди (верхний экстремум будет 1 + Size)
AvailableQueueLenStats = Queue.contentStats(AvailableQueue);

Короче, мы определи случайных поток входящих событий. Потом ввели ресурс ограниченной ёмкости, а при обработке ресурсом использовали случайную задержку.

Да, это отличается прямо от твоего кода. Это дает информацию о поведении системы в целом, но если тебе прямо нужно скармливать конкретные числа на входе и видеть конкретные числа на выходе, то тогда добро пожаловать в мир программирования на языке Haskell! Там можно всю эту модель повторить один-во-один, плюс у тебя будет вся мощь языка программирования.

anonymous
()