LINUX.ORG.RU

Сортировать и синхронизировать массивы

 , ,


0

1

Имеются 2 массива данных I(e,t) в виде 2 троек 1-мерных массивов одинаковой длины: t1, e1, I1 и t2, e2, I2. (Точнее, это — группы в открытых HDF-файлах f1['data/t'], f1['data/e'], f1['data/i'] и f2['data/t'], f2['data/e'], f2['data/i']). Данные отсортированны по возрастанию e, затем по возрастанию t. e и t образуют регулярную сетку с шагами ~0.001 и 1. I задана не во всех узлах. Повторяющихся узлов нет.

Требуется проверить, совпадают ли массивы I1 и I2. Оба файла содержат одинаковый набор пар e-t. Но из-за ошибок округления часть изначально совпадавших e немного отличается. Поэтому данные в I2 перетасованы относительно I1 — одинаковые индексы необязательно соответствуют одинаковым значениям e и t.

Имеется питоновская библиотека для чтения этих файлов, h5py. Есть ли какой-то штатный способ быстро отсортировать I2, вначале сравнивая t как целые числа, а затем e — как дробные с заданной погрешностью (скажем, абсолютная погрешность 1e-3)? Реализовать это сам я могу, но вдруг есть готовое?

Можно ли получить искомое, применив numpy.sort к list( zip( t1, e2, I2 ) ) ? Как там задать погрешность? Или есть что-то получше?

Пока сделал так:

#!/usr/bin/python3
from sys import argv
import h5py
import numpy as np

f1, f2 = h5py.File( argv[1],'r' ), h5py.File( argv[2],'r' ) 
I1, I2 = np.array(f1['data/i']), np.array(f2['data/i'])
t1, t2 = np.array(f1['data/t']), np.array(f2['data/t'])
e1, e2 = np.ma.round(np.array(f1['data/e']), 7), np.ma.round(np.array(f2['data/e']), 7)

j1, j2 = np.lexsort( (e1, t1) ), np.lexsort( (e2, t2) )

a1 = np.stack( ( t1[j1], e1[j1], I1[j1] ), axis=-1 )
a2 = np.stack( ( t2[j2], e2[j2], I2[j2] ), axis=-1 )

print( ( a1 == a2 ).all() )

Или так:

#!/usr/bin/python3
from sys import argv
import h5py
import numpy as np

f1, f2 = h5py.File( argv[1],'r' ), h5py.File( argv[2],'r' ) 
I1, I2 = np.array(f1['data/i']), np.array(f2['data/i'])
t1, t2 = np.array(f1['data/t']), np.array(f2['data/t'])
e1, e2 = np.array(f1['data/e']), np.array(f2['data/e'])

j1, j2 = np.lexsort( (e1, t1) ), np.lexsort( (e2, t2) )

print( ( t1[j1] == t2[j2] ).all() and
       ( abs(e1[j1] - e2[j2]) < 0.0000001 ).all() and
       ( abs(I1[j1] - I2[j2]) < 1 ).all() )

★★★

Последнее исправление: olegd (всего исправлений: 4)

я в numpy не очень

но по-моему решение задачи не очень тривиальное (безотносительно языка), и если данные действительно могут быть перетасованы, то выход один: идти по одному из массивов, и проверять входит ли i-й элемент во второй массив, просматривая его полностью.

Понятное дело, что проверка всяких тривиальных случаев и простейшая оптимизация ускорят процесс. Но это единственный верный выход в случае, если данные перемешаны. Адекватное «родное» врядли может быть чем-то иным.

У вас там и по e и по t округление случилось?

sshestov
()
Ответ на: я в numpy не очень от sshestov

У вас там и по e и по t округление случилось?

t целые, только по e. Каждый цельный кусок с постоянным e и возрастающими t стал двумя — с e, отличающимися на ~1e-14 :)

olegd ★★★
() автор топика
Ответ на: комментарий от maxim2

https://stackoverflow.com/questions/29352511/numpy-sort-ndarray-on-multiple-c...

Спасибо, но с сортировкой по именам я уже разобрался (сделал как 2-й вариант по ссылке, через dtype). Остался вопрос, как лучше объединять массивы. Ничего быстрее, чем list(zip( ... )) нет?

И гораздо важнее вопрос: как сортировать float, чтобы отличающиеся меньше, чем на «эпсилон», считались равными? numpy.finfo(float).eps можно присвоить любое значение, но на поведении numpy.sort() это не отражается.

olegd ★★★
() автор топика

Забудьте уже про округление, неужели не очевидно что два сколь угодно близких числа всегда могут округлиться в разные стороны? И про сортировку забудьте. Ваша задача - проверить что для каждого элемента из одного массива есть элемент в другом массиве в заданной окрестности (в обе стороны).

slovazap ★★★★★
()
Ответ на: комментарий от slovazap

Как это сделать просто и быстро?

Можно, конечно, искать каждый элемент в цикле и сравнивать серией if-ов. Но можно же проще?

olegd ★★★
() автор топика
Последнее исправление: olegd (всего исправлений: 1)
Ответ на: комментарий от olegd

Проще квадратичного алгоритма ничего нет, не знаю правда что вы имеете в виду под серией if’ов. Если он не устраивает по производительности, вариантов масса в зависимости от размеров массивов и распределения точек, включая всевозможные деревья и spatial хэши. Кейворд - «fast nearest-neighbor lookup». Можно сразу сюда посмотреть https://docs.scipy.org/doc/scipy/reference/spatial.html

slovazap ★★★★★
()
Ответ на: комментарий от olegd

Если делать sort с dtype, не работают операции сравнения и вычитания массивов. Пока сделал так:

#!/usr/bin/python3
from sys import argv
import h5py
import numpy

f1, f2 = h5py.File( argv[1],'r' ), h5py.File( argv[2],'r' ) 
I1, I2 = f1['data/i'], f2['data/i']
t1, t2 = f1['data/t'], f2['data/t']
e1, e2 = numpy.ma.round(f1['data/e'], 9), numpy.ma.round(f2['data/e'], 9)

j1, j2 = numpy.lexsort( (e1, t1) ), numpy.lexsort( (e2, t2) )

a1 = numpy.array([ ( t1[j1[l]], e1[j1[l]], I1[j1[l]] ) for l in range(len(j1)) ])
a2 = numpy.array([ ( t2[j2[l]], e2[j2[l]], I2[j2[l]] ) for l in range(len(j2)) ])

print( ( a1 == a2 ).all() )

numpy.ma.round(f1['data/e'], 7) округляет знаки до 7-го после запятой для всех элементов массива. Для имеющихся данных хватило бы 3, теоретически могут попасться данные, где нужно 6.

numpy.lexsort( (e1, t1) ) возвращает массив индексов, в каком порядке следует переставить e1 и t1, чтобы они оказались отсортированы по t1, затем по e1.

( a1 == a2 ).all() возвращает True, если все соответствующие элементы обоих массивов равны. Или False, если есть различия.

Отработало нормально. Но формирование массивов a1 и a2 очень медленное. Как бы ускорить процесс?

olegd ★★★
() автор топика
Ответ на: комментарий от slovazap

Проще квадратичного алгоритма ничего нет,

Какого именно? Гугл предлагает алгоритмы сортировки квадратичной сложности.

не знаю правда что вы имеете в виду под серией if’ов

if abs(f1-f2) < eps:
  ...
elif f1 > f2:
  ...
else:
  ...

Просто и громоздко.

fast nearest-neighbor lookup

Нахожу только многомерные. Для 1-мерного должно быть что-то проще и быстрее.

https://docs.scipy.org/doc/scipy/reference/spatial.html

Что-то из них пробовал на этой же машине для совсем другой задачи (типа диаграмм Вороного). Точек было на порядок меньше, но работало очень долго по сравнению с numpy.lexsort.

olegd ★★★
() автор топика
Ответ на: комментарий от olegd

elif f1 > f2:

Какие ещё elif и else? if abs(f1-f2) < f_eps and abs(e1-e2) < e_eps and abs (I1-I2) < I_eps: значит мы нашли совпадение для t1, e1, I1, можно break’нуться из внутреннего цикла и начать поиск для следующей точки. Если во внутреннем цикле ничего не нашли, значит массивы не совпадают.

Нахожу только многомерные.

У вас же трёхмерные данные.

slovazap ★★★★★
()
Ответ на: комментарий от slovazap

Нужно всё разбить на массивы с равным t (целым), отсортировать их по t, и каждый внутри отсортировать по e (дробному). А после этого сравнивать I.

olegd ★★★
() автор топика
Ответ на: комментарий от olegd

a1 = numpy.array([ ( t1[j1[l]], e1[j1[l]], I1[j1[l]] ) for l in range(len(j1)) ])

А не лучше ли будет не создавать питоновские списки, а переложить всё на numpy?

a1 = numpy.stack( ( t1, e1, I1 ), sort=-1)[j1]

question4 ★★★★★
()
Ответ на: комментарий от question4

То есть не «sort=-1», а «axis=-1». И быстрее будет сортировать каждый линейный массив отдельно:

a1 = numpy.stack( ( t1[j1], e1[j1], I1[j1] ), axis=-1)

question4 ★★★★★
()
Ответ на: комментарий от slovazap

по-моему данные всё-же

одномерные (как я понял задачу).

И мне кажется ошибка сидит здесь в особенности питона

a1 = numpy.array([ ( t1[j1[l]], e1[j1[l]], I1[j1[l]] ) for l in range(len(j1)) ])
a2 = numpy.array([ ( t2[j2[l]], e2[j2[l]], I2[j2[l]] ) for l in range(len(j2)) ])

я в питоне не силен, но если j1 и j2 индексы исходных линейных массивов, то в IDL-e оно бы сразу отработало.

Но я согласен с Доном slovazap в том, что там только полный перебор (впрочем, всякие тривиальные проверки могут очень сильно сократить время, особенно в случае разных массивов)

sshestov
()
Ответ на: по-моему данные всё-же от sshestov

по-моему данные всё-же одномерные (как я понял задачу)

Как можно это написать, а на 2 строки ниже привести код с тройками?

Эта задача по нечёткому сравнению двух наборов трёхмерных точек.

slovazap ★★★★★
()
Ответ на: комментарий от question4

a1 = numpy.stack( ( t1[j1], e1[j1], I1[j1] ), axis=-1)

Ускорилось в 200 раз. Спасибо.

Только пришлось все группы HDF явно преобразовать в numpy.array.

olegd ★★★
() автор топика
Ответ на: по-моему данные всё-же от sshestov

И мне кажется ошибка сидит здесь в особенности питона

Какая ошибка? Эти строки нужны были, чтобы объединять 3 линейных массива в один 3х200 000, чтобы проще было сравнивать. Этот шаг был очень медленным, да, поэтому я заменил всё на numpy.stack().

если j1 и j2 индексы исходных линейных массивов, то в IDL-e оно бы сразу отработало.

j1 и j2 — массивы индексов изначальных массивов, в каком порядке расставить их элементы, чтобы было отсортировано. Генерируются numpy.lexsort()

Но я согласен с Доном slovazap в том, что там только полный перебор (впрочем, всякие тривиальные проверки могут очень сильно сократить время, особенно в случае разных массивов)

Проверка исходит из предположения, что оба файла содержат одну таблицу, отличающуюся только порядком расположения троек и иногда последним знаком в колонке «e». (А если отличий больше, далее разбирать нет смысла.) Каждой тройке в файле f1 соответствует ровно одна тройка в файле f2. И чтобы не сравнивать каждую тройку f1 с каждой тройкой f2 (что будет медленно, если нет готовой библиотеки), я сперва сортирую их готовой библиотекой, а потом сравниваю только тройки с одинаковым индексом. Шаг по t строго 1, шаг по e не менее 0.000001 (реально около 0.001), повторений быть не должно.

Можно было бы и не объединять 3 в 1. Просто переставить элементы в каждом линейном массиве в соответствии с результатами сортировки: I1 = I1[j1] Но для меня нагляднее одно сравнение (a1 == a2).all(), чем 3 одинаковых (I1 == I2).all(); (e1 == e2).all(); (t1 == t2).all()

Единственная проблема — я обеспечил совпадение e1 и e2 при помощи round( decimals=7 ). Теоретически, могут попасться 2 числа, которые округлятся в разные стороны. Что делать с этим?

Пока приходит в голову только не объединять и смотреть (abs(e1-e2) < 0.0000001).all()

olegd ★★★
() автор топика
Ответ на: комментарий от slovazap

ну я имел ввиду

что размер массива N, а не N*N. Сложность то алгоритма растет как N^2 (или N^4 во втором случае, что многовато). А то, что там 2 или 3 координаты у каждого элемента, это сложности вообще не добавляет.

sshestov
()
Ответ на: комментарий от slovazap

Эта задача по нечёткому сравнению двух наборов трёхмерных точек.

Да. Но из-за условия, что каждая пара e-t должна встречаться строго по одному разу в обоих файлах (а если нет — проверка не пройдена), а возможные e и t образуют конечную сетку, задача упрощается.

С учётом написанного в постах выше, я пришел к следующему решению. Сортировать тройки по целому t (т.е. без погрешностей); затем по дробному e (т.к. шаг больше погрешностей, а при равном t e уникальны); после чего сравнивать целые столбцы (t1==t2).all(), а дробные (abs(e1-e2)<epsE).all() и (abs(I1-I2)<epsI).all(). Если есть несоответствия в столбцах t и e, значит наборы точек разные, и сравнивать нельзя. Если для I тоже нет несоответствия, значит массивы совпадают.

Огрехи в логике есть?

olegd ★★★
() автор топика

Ещё вариант — numpy.allclose(). Но она оказалась немного медленнее варианта с (abs(...) < ...).all и даже со stack(). На тестовых файлах без учёта чтения с диска вышло 8,8-10,2 мс против 6,2-6,9 мс и 7,3-7,8 мс. (С учётом: 42 мс против 40 мс и 58-59 мс.)

olegd ★★★
() автор топика
Последнее исправление: olegd (всего исправлений: 2)
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.