История изменений
Исправление
rtxtxtrx,
(текущая версия)
:
~
❯ vim ./test_python.py
Found existing alias for "vim". You should use: "vi"
~ 59s
❯ ./test_python.py
CPUs detected: 12, using workers=16
Parameters: CPU_TASKS=40, CPU_WORK=50000, IO_TASKS=200, IO_SLEEP=0.0200
CPU sequential : 0.234 s
CPU ThreadPoolExecutor : 0.050 s
CPU ProcessPoolExecutor : 0.160 s
IO sequential : 4.017 s
IO ThreadPoolExecutor : 0.264 s
IO ProcessPoolExecutor : 0.340 s
Summary (lower is better):
CPU thread speedup over seq: 4.66
CPU process speedup over seq: 1.47
IO thread speedup over seq: 15.21
IO process speedup over seq: 11.82
~
❯ python --version
Python 3.14.0
~
❯ asdf set python system
~
❯ python --version
Python 3.13.7
~
❯ ./test_python.py
CPUs detected: 12, using workers=16
Parameters: CPU_TASKS=40, CPU_WORK=50000, IO_TASKS=200, IO_SLEEP=0.0200
CPU sequential : 0.186 s
CPU ThreadPoolExecutor : 0.198 s
CPU ProcessPoolExecutor : 0.046 s
IO sequential : 4.017 s
IO ThreadPoolExecutor : 0.263 s
IO ProcessPoolExecutor : 0.278 s
Summary (lower is better):
CPU thread speedup over seq: 0.94
CPU process speedup over seq: 4.06
IO thread speedup over seq: 15.25
IO process speedup over seq: 14.44
~
❯ python -c 'import sys; print(sys._is_gil_enabled())'
True
~
❯ asdf set python latest
~
❯ python -c 'import sys; print(sys._is_gil_enabled())'
False
Пуньк-среньк… Какие ваши оправдания, фонаты питона? Я хочу их выслушать, а то я последнее время пишу на пхп… так как работу найти из-за засилия тупорылых херок с нейродриснявыми ats уже невозможно
test_python.py
#!/usr/bin/env python3
"""
bench_nogil_test.py
Benchmarks:
- CPU-bound: compute many Fibonacci numbers (iterative) or count primes (sieve-lite)
- IO-bound (simulated): time.sleep to simulate blocking I/O
Compares: single-thread, ThreadPoolExecutor, ProcessPoolExecutor.
Usage:
python3 bench_nogil_test.py # usual CPython
python-nogil bench_nogil_test.py # nogil build (name may vary)
"""
import time
import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import argparse
import multiprocessing
# --- Parameters you can tweak ---
CPU_TASKS = 40 # number of tasks to submit for CPU-bound test
CPU_WORK = (
50000 # parameter controlling per-task CPU work (tune to take ~0.5-2s per task)
)
IO_TASKS = 200 # number of tasks for IO-bound test
IO_SLEEP = 0.02 # seconds each IO task sleeps (simulated blocking I/O)
MAX_WORKERS = None # if None, will use min(32, os.cpu_count()+4)
# --- Helper functions (CPU-bound examples) ---
def cpu_work_fibonacci(n):
# iterative fibonacci-ish expensive loop (not using recursion to avoid stack)
a, b = 0, 1
for _ in range(n):
a, b = b, (a + b) % 10**9 # keep numbers bounded
return a
def cpu_task(idx, work=CPU_WORK):
# do somewhat heavy math (mix of loops + sqrt) to consume CPU
s = 0.0
# do some mixed load to stress interpreter and math
for i in range(1, work):
s += math.sqrt(i) * ((i & 1) * 2 - 1)
# do a small Fibonacci pass to add Python-level ops
fib = cpu_work_fibonacci(200)
return (idx, s + fib)
def io_task(idx, sleep_time=IO_SLEEP):
# simulates blocking I/O
time.sleep(sleep_time)
return idx
# --- Runner utilities ---
def run_executor(fn, tasks, executor_ctor, workers):
start = time.perf_counter()
results = []
with executor_ctor(max_workers=workers) as ex:
futures = [
ex.submit(fn, *t) if isinstance(t, tuple) else ex.submit(fn, t)
for t in tasks
]
for f in as_completed(futures):
results.append(f.result())
elapsed = time.perf_counter() - start
return elapsed, results
def run_sequential(fn, tasks):
start = time.perf_counter()
results = [fn(*t) if isinstance(t, tuple) else fn(t) for t in tasks]
elapsed = time.perf_counter() - start
return elapsed, results
def summarize(name, elapsed):
print(f"{name:30s}: {elapsed:.3f} s")
def benchmark_all(
cpu_tasks=CPU_TASKS,
cpu_work=CPU_WORK,
io_tasks=IO_TASKS,
io_sleep=IO_SLEEP,
workers=None,
):
if workers is None:
workers = min(32, (multiprocessing.cpu_count() or 1) + 4)
print(f"\nCPUs detected: {multiprocessing.cpu_count()}, using workers={workers}")
print(
"Parameters: CPU_TASKS=%d, CPU_WORK=%d, IO_TASKS=%d, IO_SLEEP=%.4f\n"
% (cpu_tasks, cpu_work, io_tasks, io_sleep)
)
cpu_tasks_list = [(i, cpu_work) for i in range(cpu_tasks)]
io_tasks_list = [(i, io_sleep) for i in range(io_tasks)]
# CPU-bound: sequential
t_seq_cpu, _ = run_sequential(cpu_task, cpu_tasks_list)
summarize("CPU sequential", t_seq_cpu)
# CPU-bound: threads
t_threads_cpu, _ = run_executor(
cpu_task, cpu_tasks_list, ThreadPoolExecutor, workers
)
summarize("CPU ThreadPoolExecutor", t_threads_cpu)
# CPU-bound: processes
t_proc_cpu, _ = run_executor(cpu_task, cpu_tasks_list, ProcessPoolExecutor, workers)
summarize("CPU ProcessPoolExecutor", t_proc_cpu)
# IO-bound: sequential
t_seq_io, _ = run_sequential(io_task, io_tasks_list)
summarize("IO sequential", t_seq_io)
# IO-bound: threads
t_threads_io, _ = run_executor(io_task, io_tasks_list, ThreadPoolExecutor, workers)
summarize("IO ThreadPoolExecutor", t_threads_io)
# IO-bound: processes
t_proc_io, _ = run_executor(io_task, io_tasks_list, ProcessPoolExecutor, workers)
summarize("IO ProcessPoolExecutor", t_proc_io)
print("\nSummary (lower is better):")
print(f" CPU thread speedup over seq: {t_seq_cpu / t_threads_cpu:.2f}")
print(f" CPU process speedup over seq: {t_seq_cpu / t_proc_cpu:.2f}")
print(f" IO thread speedup over seq: {t_seq_io / t_threads_io:.2f}")
print(f" IO process speedup over seq: {t_seq_io / t_proc_io:.2f}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple nogil performance microbench")
parser.add_argument("--cpu-tasks", type=int, default=CPU_TASKS)
parser.add_argument("--cpu-work", type=int, default=CPU_WORK)
parser.add_argument("--io-tasks", type=int, default=IO_TASKS)
parser.add_argument("--io-sleep", type=float, default=IO_SLEEP)
parser.add_argument("--workers", type=int, default=None)
args = parser.parse_args()
# Update globals (so functions capture the same parameters)
CPU_TASKS = args.cpu_tasks
CPU_WORK = args.cpu_work
IO_TASKS = args.io_tasks
IO_SLEEP = args.io_sleep
benchmark_all(
cpu_tasks=CPU_TASKS,
cpu_work=CPU_WORK,
io_tasks=IO_TASKS,
io_sleep=IO_SLEEP,
workers=args.workers,
)
Исходная версия
rtxtxtrx,
:
~
❯ vim ./test_python.py
Found existing alias for "vim". You should use: "vi"
~ 59s
❯ ./test_python.py
CPUs detected: 12, using workers=16
Parameters: CPU_TASKS=40, CPU_WORK=50000, IO_TASKS=200, IO_SLEEP=0.0200
CPU sequential : 0.234 s
CPU ThreadPoolExecutor : 0.050 s
CPU ProcessPoolExecutor : 0.160 s
IO sequential : 4.017 s
IO ThreadPoolExecutor : 0.264 s
IO ProcessPoolExecutor : 0.340 s
Summary (lower is better):
CPU thread speedup over seq: 4.66
CPU process speedup over seq: 1.47
IO thread speedup over seq: 15.21
IO process speedup over seq: 11.82
~
❯ python --version
Python 3.14.0
~
❯ asdf set python system
~
❯ python --version
Python 3.13.7
~
❯ ./test_python.py
CPUs detected: 12, using workers=16
Parameters: CPU_TASKS=40, CPU_WORK=50000, IO_TASKS=200, IO_SLEEP=0.0200
CPU sequential : 0.186 s
CPU ThreadPoolExecutor : 0.198 s
CPU ProcessPoolExecutor : 0.046 s
IO sequential : 4.017 s
IO ThreadPoolExecutor : 0.263 s
IO ProcessPoolExecutor : 0.278 s
Summary (lower is better):
CPU thread speedup over seq: 0.94
CPU process speedup over seq: 4.06
IO thread speedup over seq: 15.25
IO process speedup over seq: 14.44
test_python.py
#!/usr/bin/env python3
"""
bench_nogil_test.py
Benchmarks:
- CPU-bound: compute many Fibonacci numbers (iterative) or count primes (sieve-lite)
- IO-bound (simulated): time.sleep to simulate blocking I/O
Compares: single-thread, ThreadPoolExecutor, ProcessPoolExecutor.
Usage:
python3 bench_nogil_test.py # usual CPython
python-nogil bench_nogil_test.py # nogil build (name may vary)
"""
import time
import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import argparse
import multiprocessing
# --- Parameters you can tweak ---
CPU_TASKS = 40 # number of tasks to submit for CPU-bound test
CPU_WORK = (
50000 # parameter controlling per-task CPU work (tune to take ~0.5-2s per task)
)
IO_TASKS = 200 # number of tasks for IO-bound test
IO_SLEEP = 0.02 # seconds each IO task sleeps (simulated blocking I/O)
MAX_WORKERS = None # if None, will use min(32, os.cpu_count()+4)
# --- Helper functions (CPU-bound examples) ---
def cpu_work_fibonacci(n):
# iterative fibonacci-ish expensive loop (not using recursion to avoid stack)
a, b = 0, 1
for _ in range(n):
a, b = b, (a + b) % 10**9 # keep numbers bounded
return a
def cpu_task(idx, work=CPU_WORK):
# do somewhat heavy math (mix of loops + sqrt) to consume CPU
s = 0.0
# do some mixed load to stress interpreter and math
for i in range(1, work):
s += math.sqrt(i) * ((i & 1) * 2 - 1)
# do a small Fibonacci pass to add Python-level ops
fib = cpu_work_fibonacci(200)
return (idx, s + fib)
def io_task(idx, sleep_time=IO_SLEEP):
# simulates blocking I/O
time.sleep(sleep_time)
return idx
# --- Runner utilities ---
def run_executor(fn, tasks, executor_ctor, workers):
start = time.perf_counter()
results = []
with executor_ctor(max_workers=workers) as ex:
futures = [
ex.submit(fn, *t) if isinstance(t, tuple) else ex.submit(fn, t)
for t in tasks
]
for f in as_completed(futures):
results.append(f.result())
elapsed = time.perf_counter() - start
return elapsed, results
def run_sequential(fn, tasks):
start = time.perf_counter()
results = [fn(*t) if isinstance(t, tuple) else fn(t) for t in tasks]
elapsed = time.perf_counter() - start
return elapsed, results
def summarize(name, elapsed):
print(f"{name:30s}: {elapsed:.3f} s")
def benchmark_all(
cpu_tasks=CPU_TASKS,
cpu_work=CPU_WORK,
io_tasks=IO_TASKS,
io_sleep=IO_SLEEP,
workers=None,
):
if workers is None:
workers = min(32, (multiprocessing.cpu_count() or 1) + 4)
print(f"\nCPUs detected: {multiprocessing.cpu_count()}, using workers={workers}")
print(
"Parameters: CPU_TASKS=%d, CPU_WORK=%d, IO_TASKS=%d, IO_SLEEP=%.4f\n"
% (cpu_tasks, cpu_work, io_tasks, io_sleep)
)
cpu_tasks_list = [(i, cpu_work) for i in range(cpu_tasks)]
io_tasks_list = [(i, io_sleep) for i in range(io_tasks)]
# CPU-bound: sequential
t_seq_cpu, _ = run_sequential(cpu_task, cpu_tasks_list)
summarize("CPU sequential", t_seq_cpu)
# CPU-bound: threads
t_threads_cpu, _ = run_executor(
cpu_task, cpu_tasks_list, ThreadPoolExecutor, workers
)
summarize("CPU ThreadPoolExecutor", t_threads_cpu)
# CPU-bound: processes
t_proc_cpu, _ = run_executor(cpu_task, cpu_tasks_list, ProcessPoolExecutor, workers)
summarize("CPU ProcessPoolExecutor", t_proc_cpu)
# IO-bound: sequential
t_seq_io, _ = run_sequential(io_task, io_tasks_list)
summarize("IO sequential", t_seq_io)
# IO-bound: threads
t_threads_io, _ = run_executor(io_task, io_tasks_list, ThreadPoolExecutor, workers)
summarize("IO ThreadPoolExecutor", t_threads_io)
# IO-bound: processes
t_proc_io, _ = run_executor(io_task, io_tasks_list, ProcessPoolExecutor, workers)
summarize("IO ProcessPoolExecutor", t_proc_io)
print("\nSummary (lower is better):")
print(f" CPU thread speedup over seq: {t_seq_cpu / t_threads_cpu:.2f}")
print(f" CPU process speedup over seq: {t_seq_cpu / t_proc_cpu:.2f}")
print(f" IO thread speedup over seq: {t_seq_io / t_threads_io:.2f}")
print(f" IO process speedup over seq: {t_seq_io / t_proc_io:.2f}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple nogil performance microbench")
parser.add_argument("--cpu-tasks", type=int, default=CPU_TASKS)
parser.add_argument("--cpu-work", type=int, default=CPU_WORK)
parser.add_argument("--io-tasks", type=int, default=IO_TASKS)
parser.add_argument("--io-sleep", type=float, default=IO_SLEEP)
parser.add_argument("--workers", type=int, default=None)
args = parser.parse_args()
# Update globals (so functions capture the same parameters)
CPU_TASKS = args.cpu_tasks
CPU_WORK = args.cpu_work
IO_TASKS = args.io_tasks
IO_SLEEP = args.io_sleep
benchmark_all(
cpu_tasks=CPU_TASKS,
cpu_work=CPU_WORK,
io_tasks=IO_TASKS,
io_sleep=IO_SLEEP,
workers=args.workers,
)