LINUX.ORG.RU

История изменений

Исправление vvn_black, (текущая версия) :

Как-то так на 3.7, если поток не рабочий check выкинет эксепшен, который надо обработать.

import asyncio
import aiohttp


URL = 'http://rtmp.domain.tld:2300'


async def check(url, chunk_size, timeout_sec):
    timeout = aiohttp.ClientTimeout(total=timeout_sec)
    async with aiohttp.ClientSession() as session:
        async with session.get(url, timeout=timeout) as resp:
            content = await resp.content.read(chunk_size)
            assert chunk_size == len(content)


asyncio.run(check(URL, 256, 5))

Если хочется request и синхронности, то по сути также.

Исходная версия vvn_black, :

Как-то так на 3.7, если поток не рабочий check выкинет эксепшен, который надо обработать.

import asyncio
import aiohttp


URL = 'http://rtmp.domain.tld:2300'


async def check(url, chunk_size):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            content = await resp.content.read(chunk_size)
            assert chunk_size == len(content)


asyncio.run(check(URL, 256))

Если хочется request и синхронности, то по сути также.