LINUX.ORG.RU

Проблема с многопоточным Perl скриптом при запуске через SSH


0

1

Есть проблема с простым Perl-скриптом:
---------------------------------------------------------------------
use strict;
use warnings;
use threads;
use threads::shared;
use POSIX;

my $print_mutex : shared;

#####################################################################

sub _print($)
{
my $str = shift;
lock($print_mutex);
my $id = threads->tid();
my $time = strftime('%H:%M:%S', localtime time);
print «$time [$id] $str»;
return;
}

#####################################################################

sub run()
{
for my $i (1 .. 3)
{
_print(«Begin $i\n»);
sleep 1;
_print(«End $i\n»);
}
return threads->tid();
}

#####################################################################

_print «Starting test.\n»;
my @threads;
for my $thr_num (1 .. 2)
{
my $thr = threads->create('run');
push @threads, $thr;
_print «Thread created.\n»;
}
foreach (@threads)
{
my $id = $_->join;
_print «Thread '$id' finished.\n»;
}
_print «Test finished.\n»;

#####################################################################
---------------------------------------------------------------------

Когда запускаю его на Linux с Perl-5.10 все работает корректно:

$ perl /tmp/a.pl
14:25:54 [0] Starting test.
14:25:54 [0] Thread created.
14:25:54 [1] Begin 1
14:25:54 [0] Thread created.
14:25:54 [2] Begin 1
14:25:55 [1] End 1
14:25:55 [1] Begin 2
14:25:55 [2] End 1
14:25:55 [2] Begin 2
14:25:56 [1] End 2
14:25:56 [1] Begin 3
14:25:56 [2] End 2
14:25:56 [2] Begin 3
14:25:57 [1] End 3
14:25:57 [0] Thread '1' finished.
14:25:57 [2] End 3
14:25:57 [0] Thread '2' finished.
14:25:57 [0] Test finished.
$

А вот когда я запускаю этот же скрипт через SSH, то вижу какой-то бред (смотрите на таймстампы и ID потоков):

$ ssh localhost 'perl /tmp/a.pl'
14:26:11 [0] Starting test.
14:26:11 [0] Thread created.
14:26:11 [1] Begin 1
14:26:12 [1] End 1
14:26:12 [1] Begin 2
14:26:13 [1] End 2
14:26:13 [1] Begin 3
14:26:14 [1] End 3
14:26:11 [2] Begin 1
14:26:12 [2] End 1
14:26:12 [2] Begin 2
14:26:13 [2] End 2
14:26:13 [2] Begin 3
14:26:14 [2] End 3
14:26:11 [0] Thread created.
14:26:14 [0] Thread '1' finished.
14:26:14 [0] Thread '2' finished.
14:26:14 [0] Test finished.
$

Никогда не видел ничего подобного на однопоточных Perl-скриптах и заметил что проблема с I/O начинается после создания первого потока.

Проверял этот же скрипт на Windows с Perl-5.12 и получил те же результаты. Так что похоже что проблема не с конкретной версией Perl или OS.

В чем тут дело?

А где flush для stdout? То что ты сделал print, совершенно не обязывает перл сразу же выводить печатываемое на экран.

И да, используй LORCODE. Читать без отступов и подсветки не очень удобно.

Vovka-Korovka ★★★★★ ()

дадада! верно говорят! $|++ ;)
кажется, ничего не перепутал )

aol ★★★★★ ()
Ответ на: комментарий от Vovka-Korovka

Я добавил $| = 1; в начало скрипта и к своему удивлению обнаружил что проблема ушла.
Непонимание, однако, осталось.

Если я понимаю правильно, если мы НЕ используем flush, то данные накапливаются в буфере и выводятся когда буфер заполняется или (в случае line-buffered) если встречается newline. Наконец close вызывает flush.

Если это верно, то да, ожидать вывода немедленно без явного flush'а неверно. Но я совсем не об этом и говорил.
Мой основной вопрос - почему время в таймстампах в выводе не возрастает монотонно? Пусть даже вывод буфферизируется, но порядок записи должен сохраняться.

Krivenok_Dmitry ()
Ответ на: комментарий от Krivenok_Dmitry

Мой основной вопрос - почему время в таймстампах в выводе не возрастает монотонно? Пусть даже вывод буфферизируется, но порядок записи должен сохраняться.

Во - первых, следует понимать, что потоки в перле это вещь достаточно специфичная, непохожая на потоки в других языках. При создании нового потока клонируется интепретатор и каждый поток работает со своей версией интерпретатора. В следствие этого, изменение переменных в одном потоке обычно изменяет переменную только в одном интерпретаре, не затрагивая другие. Чтобы расшарить переменную между потоками нужно делать дополнительные усилия, используя модуль threads::shared.

Во-вторых, ввод-вывод в перле буферизован. Это значит, что после вызова print текст сначала попадает в буфер и лишь после наполнения буфера будет выведен на экран.

А теперь вспоминаем про про «во-первых» и догадываемся, что этот самый буфер для stdout - свой для каждого потока.

Все вышесказанное следует учитывать и при чтении из файла. В перле, чтобы читать в несколько потоков из одного файла, нужно создавать отдельный поток который будет читать из файла строки и засовывать в расшаренный массив, а остальные потоки уже будут брать эти строки из массива.

Vovka-Korovka ★★★★★ ()
Ответ на: комментарий от Krivenok_Dmitry

В догонку ломающий мозг пример, когда несколько потоков пытаются читать из одного файла

#!/usr/bin/perl -w 

use strict;
use threads;

die "Cannot open input file\n" unless open(FILE, 'test.txt');

my $thr1 = threads->create(sub{
                                 my $line = <FILE>;
                                 if ($line)
                                 {
                                     print "Thread 1: line = $line";
                                 }
                                 else
                                 {
                                     print "Thread 1: file is empty\n" 
                                 };
                                 sleep(2);
                             }
                         );
my $thr2 = threads->create(sub{
                                sleep(1);
                                my $line = <FILE>;
                                if ($line)
                                {
                                    print "Thread 2: line = $line";
                                }
                                else
                                {
                                    print "Thread 2: file is empty\n";
                                }
                              }
                          );
my $thr3 = threads->create(sub{
                                sleep(3);  
                                my $line = <FILE>;
                                if ($line)
                                {
                                    print "Thread 3: line = $line";
                                }    
                                else 
                                {
                                    print " Thread 3: file is empty\n";
                                }    
                              }
                          );
$thr1->join();
$thr2->join();
$thr3->join();

Входной файл test.txt содержит всего две строчки

$ cat ./test.txt 
1
2

А теперь вывод программы

$ ./example.pl 
Thread 1: line = 1
Thread 2: file is empty
Thread 3: line = 2
Vovka-Korovka ★★★★★ ()
Ответ на: комментарий от Vovka-Korovka

Для print забыл autoflush добавить. Но не беда :-) Надеюсь идея понятна.

Vovka-Korovka ★★★★★ ()
Ответ на: комментарий от Vovka-Korovka

На самом деле мысль про thread-local-buffer была у меня с самого начала, но я все же надеялся, что это не так.
Помоему это просто идиотский дизайн, но не в этом суть.

Если не сложно, можешь кинуть линк на доку где об этом говорится?

Спасибо!

Krivenok_Dmitry ()
Ответ на: комментарий от Krivenok_Dmitry

На самом деле мысль про thread-local-buffer была у меня с самого начала, но я все же надеялся, что это не так. Помоему это просто идиотский дизайн, но не в этом суть.

Да дизайн - то не идиотский. То что у каждого потока свой интепретатор позволяет не использовать блокировку интерпретатора как в питоне. В итоге перловские потоки получаются быстрее. Правда их неудобно использовать.

Почитать можно и perldoc http://perldoc.perl.org/perlthrtut.html

Хотя про грабли с вводом/выводом там и не описано. Я просто сам как-то на эти грабли наступил, с тех пор ученый.

Vovka-Korovka ★★★★★ ()

Через ssh и однопоточная выдаёт вывод только по окончании.

Olegymous ★★ ()
Ответ на: комментарий от Olegymous

>use sysread, Luke.

Насколько я понял из описания, он читает определенное количество байт. То есть построчно читать не получится. А иногда хочется именно построчно.

Vovka-Korovka ★★★★★ ()
Ответ на: комментарий от Vovka-Korovka

К слову про твой ломающий мозг пример.
Хорошая ведь штука strace - открывает завесу тайны :)
Далее следует вывод с комментами:

...
# ОСНОВНОЙ ПОТОК ОТКРЫВАЕТ ФАЙЛ - ДЕСКРИПТОР 3
18:58:10.847956 open(«test.txt», O_RDONLY) = 3
...
# ДАЛЕЕ СОЗДАЕТ 1-й ПОТОК С PID 31910 (У ПАРЕНТА 31909)
18:58:10.870185 clone(Process 31910 attached
child_stack=0x422c5260, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, p
arent_tidptr=0x422c59e0, tls=0x422c5950, child_tidptr=0x422c59e0) = 31910
[pid 31909] 18:58:10.870433 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
...
# ПЕРВЫЙ ПОТОК ЧИТАЕТ 4 БАЙТА - ТО ЕСТЬ ВСЕ СОДЕРЖИМОЕ ФАЙЛА
[pid 31910] 18:58:10.881052 read(3, «1\n2\n»..., 4096) = 4
# РАПОРТУЕТ ЧТО ПРОЧИТАЛ СТРОКУ
[pid 31910] 18:58:10.881239 write(1, «Thread 1: line = 1\n»..., 19Thread 1: line = 1
) = 19
...
# И УХОДИТ СПАТЬ НА 2 СЕКУНДЫ
[pid 31910] 18:58:10.881595 nanosleep({2, 0}, <unfinished ...>
...
# СОЗДАЕТСЯ 2-й ПОТОК С ПИДОМ 31911
[pid 31909] 18:58:10.884129 clone(Process 31911 attached (waiting for parent)
Process 31911 resumed (parent 31909 ready)
child_stack=0x42ac6260, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x42ac69e0, tls=0x42ac6950, child_tidptr=0x42ac69e0) = 31911
...
# 2-й ПОТОК УХОДИТ В СЛИП НА 1 СЕКУНДУ
[pid 31911] 18:58:10.885460 nanosleep({1, 0}, <unfinished ...>
...
# СОЗДАЕТСЯ 3-й ПОТОК С ПИДОМ 31912
[pid 31909] 18:58:10.892336 clone(Process 31912 attached
child_stack=0x432c7260, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x432c79e0, tls=0x432c7950, child_tidptr=0x432c79e0) = 31912
...
# ПАРЕНТ ВИСИТ НА FUTEX (ТО ЕСТЬ ДЕЛАЕТ JOIN В КОДЕ)
[pid 31909] 18:58:10.892642 futex(0x422c59e0, FUTEX_WAIT, 31910, NULL <unfinished ...>
...
# 3-й ЗАСЫПАЕТ НА 3 СЕКУНДЫ
[pid 31912] 18:58:10.893129 nanosleep({3, 0}, <unfinished ...>
...
# ВТОРОЙ ВЫХОДИТ ИЗ СНА
[pid 31911] 18:58:11.885599 <... nanosleep resumed> {1, 0}) = 0
# ЧИТАЕТ И ВИДИТ ЧТО ФАЙЛ ТО ПУСТОЙ!
[pid 31911] 18:58:11.885765 read(3, ""..., 4096) = 0
# РАПОРТУЕТ ОБ ЭТОМ
[pid 31911] 18:58:11.885857 write(1, «Thread 2: file is empty\n»..., 24Thread 2: file is empty
) = 24
...
# И НАКОНЕЦ 2-й ЗАВЕРШАЕТСЯ
[pid 31911] 18:58:11.885986 rt_sigprocmask(SIG_BLOCK, ~[ILL BUS SEGV RTMIN RT_1], NULL, 8) = 0
[pid 31911] 18:58:11.886087 _exit(0) = ?
Process 31911 detached
...
# ПЕРВЫЙ ВЫХОДИТ ИЗ СНА НА 2 СЕКУНДЫ
[pid 31910] 18:58:12.881739 <... nanosleep resumed> {2, 0}) = 0
[pid 31910] 18:58:12.881949 rt_sigprocmask(SIG_BLOCK, ~[ILL BUS SEGV RTMIN RT_1], NULL, 8) = 0
...
# !!!
# И ВОТ ТУТ ЭТА СВОЛОЧЬ ДЕЛАЕТ LSEEK НА 2-й БАЙТ В ФАЙЛЕ
# ЭМУЛИРУЯ ПРИ ЭТОМ ЧТЕНИЕ ИМ ТОЛЬКО ОДНОЙ СТРОКИ
# !!!
[pid 31910] 18:58:12.882045 lseek(3, 2, SEEK_SET) = 2
[pid 31910] 18:58:12.882131 lseek(3, 0, SEEK_CUR) = 2
# И 1-й БЛАГОПОЛУЧНО ЗАВЕРШАЕТСЯ
[pid 31910] 18:58:12.882253 _exit(0) = ?
Process 31910 detached
...
# ПАРЕНТ ПРОСЫПАЕТСЯ ИЗ JOIN'a
[pid 31909] 18:58:12.882358 <... futex resumed> ) = 0
...
# ЗАТЕМ СНОВА ЛОЧИТСЯ НА ДРУГОМ JOIN'е
[pid 31909] 18:58:12.884694 futex(0x432c79e0, FUTEX_WAIT, 31912, NULL <unfinished ...>
...
# НАКОНЕЦ ПРОСЫПАЕТСЯ 3-й
[pid 31912] 18:58:13.893356 <... nanosleep resumed> {3, 0}) = 0
# И ДЕЛАЕТ READ, КОТОРЫЙ ЧИТАЕТ 2 БАЙТА НАЧИНАЯ СО 2-го
[pid 31912] 18:58:13.894442 read(3, «2\n»..., 4096) = 2
# И РАПОРТУЕТ ОБ ЭТОМ
[pid 31912] 18:58:13.894590 write(1, «Thread 3: line = 2\n»..., 19Thread 3: line = 2
) = 19


Вот как-то так все и объясняется.
Как всегда все через одно место сделано.

Krivenok_Dmitry ()
Ответ на: комментарий от Vovka-Korovka

Никто не мешает сделать простенький враппер

sub getline {
	my $fh = shift;
	
	my $buf = '';
	my $char;
	while (sysread($fh, $char, 1)) {
		substr($buf, length $buf) = $char;
		last if $char eq $/;
	}
	
	return $buf;
}

Olegymous ★★ ()
Ответ на: комментарий от Krivenok_Dmitry

А трейс аналога на C можете прокомментировать?

#include <stdio.h>
#include <unistd.h>
#include <stdint.h> 
#include <pthread.h>

FILE *fh;

void *thread_sub(void *n) {
	int number = (int)(intptr_t)n;
	char line[10];
	
	if (number == 2) sleep(1);
	else if (number == 3) sleep(3);
	
	if (fgets(line, 9, fh)) {
		printf("Thread %d: line = %s", number, line);
	}
	else {
		printf("Thread %d: file is empty\n", number);
	}
	
	if (number == 1) sleep(2);
}

int main() {
	pthread_t threads[3];
	int i;
	
	fh = fopen("test.txt", "r");
	
	for (i=0; i<3; i++) {
		pthread_create(&threads[i], NULL, thread_sub, (void *)(intptr_t)(i+1));
	}
	
	void **rv;
	for (i=0; i<3; i++) {
		pthread_join(threads[i], rv);
	}
	
	return 0;
}

Olegymous ★★ ()
Ответ на: комментарий от Vovka-Korovka

Не справился с управлением

Из меня ещё тот писатель на C. Так вероятно задумывалось

#include <stdio.h>
#include <unistd.h>
#include <stdint.h> 
#include <pthread.h>

FILE *fh;

void *thread_sub(void *n) {
	int number = (int)(intptr_t)n;
	char line[10];
	
	if (number == 2) sleep(1);
	else if (number == 3) sleep(3);
	
	if (fgets(line, 9, fh)) {
		printf("Thread %d: line = %s", number, line);
	}
	else {
		printf("Thread %d: file is empty\n", number);
	}
	
	if (number == 1) sleep(2);
}

int main() {
	pthread_t threads[3];
	int i;
	
	fh = fopen("test.txt", "r");
	
	for (i=0; i<3; i++) {
		pthread_create(&threads[i], NULL, thread_sub, (void *)(intptr_t)(i+1));
	}
	
	for (i=0; i<3; i++) {
		pthread_join(threads[i], NULL);
	}
	
	return 0;
}

Olegymous ★★ ()
Ответ на: Не справился с управлением от Olegymous

На C имеем

$ ./a.out
Thread 1: line = a
Thread 2: line = b
Thread 3: file is empty
$

Тут 1-й поток читает все и рапортует о чтении 1-й строки:
[pid 10024] 23:27:24.453882 read(3, <unfinished ...>
[pid 10024] 23:27:24.453935 <... read resumed> «a\nb\n», 4096) = 4
[pid 10024] 23:27:24.454232 write(1, «Thread 1: line = a\n», 19

2-й поток вообще не читает (не делает read), у берет данные из буфера
[pid 10025] 23:27:25.454987 write(1, «Thread 2: line = b\n», 19

3-й же обнаруживает, что буфер пуст и делает read, возвращающий 0
[pid 10026] 23:27:27.454998 read(3, "", 4096) = 0
[pid 10026] 23:27:27.455126 write(1, «Thread 3: file is empty\n», 24

Я бы вообще не миксовал потоки и библиотеки ввода-вывода.
Лучше уж использовать syscall'ы и выполнять явную синхронизацию где это нужно.

Krivenok_Dmitry ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.