LINUX.ORG.RU

GLib проблема с потоками GMainLoop

 


1

1

Всем привет! Опять я в ступоре, мысли кончились, на сей раз проблема с GMainLoop и блокированием потока. Есть не-default GMainContext, к которому приаттачены отдельными потоками секундный таймер и слушающий TCP-сервер. Проблема в том, что когда подключается клиент, таймер перестает работать. Отключается клиент - работа таймера восстанавливается. Как побороть данную проблему? Ниже привожу полный код.

main.h:

/*
 * main.h
 *
 *  Created on: Jan 15, 2014
 *      Author: xuch
 */

#ifndef MAIN_H_
#define MAIN_H_

using namespace std;

#include <stdio.h>
#include <glib.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netdb.h>
#include <signal.h>
#include <syslog.h>
#include <fcntl.h>
#include <string.h>
#include <stdarg.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>

GMainLoop *main_loop;

static int daemon_proc;

typedef void Sigfunc(int);   /* for signal handlers */
#define SA struct sockaddr
#define LISTENQ         1024

class Dat
{
public:
	Dat(void);
	int Status;
};


class ControlServer
{
public:
	Dat *CData;
	GMainContext *Ctx;
	ControlServer(Dat *, GMainContext *, const char *, const char *);
	int GThStart();

private:
  char host[254];
  char port[254];

  static bool Handler(GIOChannel *, GIOCondition, gpointer);
  static bool GStartStatic(gpointer);
  static bool GStartStatic2(gpointer);
  int TcpListen(const char *, const char *, socklen_t *);
  void ErrQuit(const char *, ...);
  static void ErrSys(const char *, ...);
  static void ErrDoIt(int, int, const char *, va_list);
  void Listen(int, int);
  int Close(int);
  int SetSockOpt(int, int, int, const void *, int);
};

class Parent
{
public:
	GMainLoop *MainLoop;
	Parent(const char *, const char *);
	~Parent(void);
	void Start(size_t);
	void Init(void);
	void StartTimer(void);
private:
	Dat *Data;
	GMainContext *Context;
	ControlServer *Srv;
	static gboolean TimeoutCallback(gpointer);
	static gboolean TimeoutCallback2(gpointer);
	static gpointer SecTimer(gpointer);
};

#endif /* MAIN_H_ */

main.cpp:

/*
 * main.cpp
 *
 *  Created on: Jan 15, 2014
 *      Author: xuch
 */

#include "main.h"

using namespace std;

int main(void)
{
	Parent *Par;

	printf("Starting main...\n");
	Par = new Parent("127.0.0.1", "54321");
	Par->StartTimer();
	Par->RunMainLoop();
	delete Par;
}

Parent::Parent(const char *ip, const char *port)
{
	Data = new Dat();
	Context = g_main_context_new();
	Srv = new ControlServer(Data, Context, ip, port);
	MainLoop = g_main_loop_new(Context, false);
}

Parent::~Parent()
{
	delete Data;
	delete Srv;
}

void Parent::Start(size_t interval)
{
	g_thread_init(NULL);

	Srv->GThStart();	// Start listen TCP Server
	main_loop = g_main_loop_new(NULL, FALSE);
	g_timeout_add_seconds(interval, TimeoutCallback, Data);
	g_main_loop_run(main_loop);
}

void Parent::RunMainLoop()
{
	g_main_loop_run(MainLoop);
}

gpointer Parent::SecTimer(gpointer data)
{
	Parent *Prnt = (Parent *) data;
	GSource *Source = g_timeout_source_new(1000);	// 1 second interval
	g_source_set_callback(Source, TimeoutCallback, Prnt->Data, NULL);
	g_source_attach(Source, Prnt->Context);
	g_source_unref(Source);
	return NULL;
}

gboolean Parent::TimeoutCallback(gpointer data)
{
	Dat *Data = (Dat *) data;
	Data->Status++;
	printf("%ld-> Timeout Callback! Status now: '%d'\n", time(NULL), Data->Status);
	return TRUE;
}

void Parent::StartTimer()
{
	GError *error = NULL;

	g_thread_init(NULL);

	Srv->GThStart();	// Start listen TCP Server

	GThread *t = g_thread_create(SecTimer, this, TRUE, &error);
	g_thread_join(t);
}

ControlServer::ControlServer(Dat *data, GMainContext * gtx, const char *ip, const char *prt)
{
	CData = data;
	Ctx = gtx;
	strcpy(host, ip);
	strcpy(port, prt);
}

bool ControlServer::Handler(GIOChannel *in, GIOCondition condition, gpointer data)
{
  Dat *Dt = (Dat *) data;
  struct sockaddr_storage income;
  int insock, newsock;
  socklen_t income_len;
  struct sockaddr peer;
  socklen_t size;
  int count = 0;

  insock = g_io_channel_unix_get_fd(in);
  income_len = sizeof(income);
  newsock = accept(insock, (struct sockaddr *) &income, &income_len);

  size = sizeof(peer);
  getpeername(newsock, &peer, &size);
  struct sockaddr_in *ipv4 = (struct sockaddr_in *) &peer;

  printf("%ld-> New Client connected -> ", time(NULL));
  printf("Host IP:Port '%s:%d'\n", inet_ntoa(ipv4->sin_addr), ipv4->sin_port);

  do
  {
	  const int len = 1024 * 1024;
	  char buf[len + 1];
	  bzero(buf, len + 1);
	  count = recv(newsock, buf, len, 0);
	  buf[len] = '\0';
	  Dt->Status++;
	  printf("%ld-> Received string: '%s'; Dat.Status = %d\n", time(NULL), buf, Dt->Status);

  } while (count > 0);

  printf("%ld-> Connection closed for Host IP:Port '%s:%d'; Dat.Status = %d\n", time(NULL), inet_ntoa(ipv4->sin_addr), ipv4->sin_port, Dt->Status);

  return true;
}

bool ControlServer::GStartStatic(gpointer data)
{
  ControlServer *CServer = (ControlServer *) data;

  int listenfd;
  socklen_t addr_len;
  GIOChannel *Channel;
  GSource *Source;

  listenfd = CServer->TcpListen(CServer->host, CServer->port, &addr_len);
  Channel = g_io_channel_unix_new(listenfd);
  Source = g_io_create_watch(Channel, G_IO_IN);
  g_source_set_callback(Source, (GSourceFunc) Handler, (gpointer) CServer->CData, NULL);
  g_source_attach(Source, CServer->Ctx);
  g_source_unref(Source);

  return false;
}

int ControlServer::GThStart()
{
	GError *error;
	GThread *t = g_thread_create((GThreadFunc) GStartStatic, this, TRUE, &error);
	g_thread_join(t);
	return 0;
}

void ControlServer::ErrDoIt(int errno_flag, int level, const char *fmt, va_list ap)
{
  int errno_save, n;
  char buf[254 + 1];

  errno_save = errno;

#ifdef HAVE_VSNPRINTF
  vsnprintf(buf, CHAR_BUF_SIZE, fmt, ap);
#else
  vsprintf(buf, fmt, ap);
#endif
  n = strlen(buf);
  if (errno_flag)
    snprintf(buf + n, 254 - n, ": %s", strerror(errno_save));
  strcat(buf, "\n");

  if (daemon_proc)
    {
      syslog(level, buf);
    }
  else
    {
      fflush(stdout);
      fputs(buf, stderr);
      fflush(stderr);
    }
  return;
}

void ControlServer::ErrQuit(const char *fmt, ...)
{
  va_list ap;

  va_start(ap, fmt);
  ErrDoIt(0, LOG_ERR, fmt, ap);
  va_end(ap);
  exit(1);
}

void ControlServer::ErrSys(const char *fmt, ...)
{
  va_list ap;

  va_start(ap, fmt);
  ErrDoIt(1, LOG_ERR, fmt, ap);
  va_end(ap);
  exit(1);
}

int ControlServer::Close(int fd)
{
  return close(fd);
}

int ControlServer::SetSockOpt(int sock_fd, int level, int opt_name, const void *opt_val, int opt_len)
{
  return setsockopt(sock_fd, level, opt_name, opt_val, opt_len);
}

void ControlServer::Listen(int fd, int backlog)
{
  char *ptr;

  if ((ptr = getenv("LISTENQ")) != NULL)
    backlog = atoi(ptr);

  if (listen(fd, backlog) < 0)
    ErrSys("listen error");
}

int ControlServer::TcpListen(const char *host, const char *serv, socklen_t *addrlenp)
{
  int listen_fd, n;
  const int on = 1;
  struct addrinfo hints, *res, *res_save;

  bzero(&hints, sizeof(struct addrinfo));
  hints.ai_flags = AI_PASSIVE;
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;

  if ((n = getaddrinfo(host, serv, &hints, &res)) != 0)
    {
      ErrQuit("TcpListen error for %s. %s: %s\n", host, serv, gai_strerror(n));
    }
  res_save = res;

  do
    {
      listen_fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
      if (listen_fd < 0)
        continue;

      SetSockOpt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
      if (bind(listen_fd, res->ai_addr, res->ai_addrlen) == 0)
        break;

      Close(listen_fd);

    } while ((res = res->ai_next) != NULL);

  if (res == NULL)
    {
      ErrSys("TcpListen error for %s, %s", host, serv);
    }

  Listen(listen_fd, LISTENQ);
  if (addrlenp)
    *addrlenp = res->ai_addrlen;
  return listen_fd;
}

Dat::Dat()
{
	Status = 0;
}



Последнее исправление: cetjs2 (всего исправлений: 1)

Glib, сишное api, libc, sys/socket.h и плюсы - ну ты понял да. Есть же кутишечка и бусты - юзай, если нужны плюсы. Ты привратил 30строчек в адскую лапну на полусишканедоплюсахсклассами, которую никто в здравом уме пытаться прочесть не будет.

anonymous
()

Код особо не читал. Возможно, проблема в том, что ControlServer::Handler не возвращает управление, а делает блокирующее чтение из сокета в цикле.

Если да, вместо этого нужно в нем регистрировать обработчик на новый сокет, в котором уже читать данные по мере их поступления. Если glib использовать не обязательно, советую взять libev или libevent, на их примере будет проще разобраться с подобной задачей, в документации есть примеры.

Еще для C++ вроде есть glibmm.

gv
()
Ответ на: комментарий от gv

Я конечно понимаю, что хочется задающего вопрос назвать идиотом, но «адская лапна» возникла не от нечего делать и не по простой дурости. Спасибо за предложения, но glib к использованию обязателен, т.к. основа приложения это gsdtreamer. Собственно поэтому такая мешанина и получилась. QT использовать не могу - приложение под встраивамый одноплатный компьютер на ARM-процессоре, не могу отдать столько ресурсов под QT. Смысл в том, что buswatch от gstreamer'a нужно так связать с таймером и TCP-сервером на GLib, чтобы последний не блокировал первых двух. Если я реализую таймер вне пространства GLib, то возникает проблема с тем, что все переменные экземпляров классов, измененные в пределах GMainLoop, в обработчике данного таймера будут иметь неактуальные значения, в частности, имеющие значения после инициализации. TCP-сервер был реализован на Си, однако также выяснилось, что в его методах также читаются неактуализированные значения переменных. Поэтому TCP-сервер был интегрирован в пространство GLib. Само собой, что когда все заработает, я адаптирую Сишный код сервера в GLib'овский код. Вопрос во взаимодействии этих процессов и разнесении потоков, а не частные вопросы стиля программного кода.

Xuch
() автор топика

попробуй в цикле с recv() руками дергать g_main_context_iteration()

ananas ★★★★★
()

Если у тебя уже Glib, то почему ты не используешь GIO для работы с сетевым вводом-выводом в рамках GMainLoop?

yoghurt ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.