LINUX.ORG.RU

Помогите разобраться с sendmsg & recvmsg.


0

1

Пишу программу на массивно параллельной системе с расделенной памятью. Суть программы решение системы из 4 уравнений гармонических колебаний. Клиент работает нормально и дорабатывает правильно, вся соль в том, как потом передать массивы данных с 4 процессоров на сервер. Не могу понять почему функции sendmsg & recvmsg не работают.Прилагаю код.(Mpi не предлагать)

Клиент:

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <resolv.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>

int sock;
struct sockaddr_in addr;
/////////////////////////
union num
{
	double val;
	char st[8];
}value;
double points;
double z11,z22,x11,x22;
double z1=0,z2=0,x1=0,x2=0;
double omega1,omega2;
double i;
char buffer[9];
//////////////////////
char buf_msg[3333][9];
struct msghdr msg;
struct iovec io[3333];
/////////////////////////
int main(int argc, char *argv[])
{
    char adrout1[13]="10.148.0.254";
    char adrout2[13]="10.148.0.";
    char buf[80];
    int bytes_read;
    int nd=0,j;
    int pointer=0;
	int fails;
	////////////////////
	z11=0;
	z22=0;
	omega1=atof(argv[1]);
	x11=atof(argv[2]);
	omega2=atof(argv[3]);
	x22=atof(argv[4]);
	points=atof(argv[5]);
	nd=atoi(argv[6]);
	switch(nd)
	{
	case 1:
		strcat(adrout2,"2");
		break;
	case 2:
		strcat(adrout2,"1");
		break;
	case 3:
		strcat(adrout2,"5");
		break;
	case 5:
		strcat(adrout2,"3");
		break;
	}
	/////////////////
    sock = socket(AF_INET, SOCK_DGRAM, 0);
    if(sock < 0)
    {
        perror("socket");
        exit(1);
    }
    addr.sin_family = AF_INET;
    addr.sin_port = htons(3426);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    if(bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
    {
        perror("bind");
        exit(2);
    }
	////////////////////////////////////
    bzero(&msg,sizeof(msg));
    msg.msg_name="10.148.0.254";
    msg.msg_namelen=sizeof("10.148.0.254");
    while(buf[0]!='G' && buf[1]!='o')
    {
	bytes_read = recvfrom(sock, buf, 1024, 0, NULL, NULL);
    }
	
    for(i=0.3,pointer=0;i<=points;i+=0.3,pointer++)
		switch(nd)
		{
		case 1:
			if(i!=0.3)
			{
				bytes_read = recvfrom(sock, buf, 1024, 0, NULL, NULL);
				buf[bytes_read]='\0';
				strncpy(value.st,buf+1,8);
				x11=value.val;
			}
			z1=z11+0.1*x11;
			z11=z1;
			value.val=z11;
			buffer[0]='1';
			strncpy(buffer+1,value.st,8);
			buffer[9]='\0';
			addr.sin_family = AF_INET;
    			addr.sin_port = htons(3426);
			inet_aton("10.148.0.2",&addr.sin_addr);//10.148.0.254
			sendto(sock, buffer, 9, 0, (struct sockaddr *)&addr, sizeof(addr));
			io[pointer].iov_base=buf_msg[pointer];
			sprintf(buf_msg[pointer],buffer);
			io[pointer].iov_len=strlen(buffer);
			break;
		case 2:
			if(i!=0.3)
			{
				bytes_read = recvfrom(sock, buf, 1024, 0, NULL, NULL);
				buf[bytes_read]='\0';
				strncpy(value.st,buf+1,8);
				z11=value.val;
			}
			x1=x11-0.1*omega1*omega1*z11;
			x11=x1;
			value.val=x11;
			buffer[0]='2';
			strncpy(buffer+1,value.st,8);
			buffer[9]='\0';
			addr.sin_family = AF_INET;
    			addr.sin_port = htons(3426);
    			inet_aton("10.148.0.1",&addr.sin_addr);//10.148.0.254
			sendto(sock, buffer, 9, 0, (struct sockaddr *)&addr, sizeof(addr));			
			io[pointer].iov_base=buf_msg[pointer];
			sprintf(buf_msg[pointer],buffer);
			io[pointer].iov_len=strlen(buffer);
			break;
		case 3:
			if(i!=0.3)
			{
				bytes_read = recvfrom(sock, buf, 1024, 0, NULL, NULL);
				buf[bytes_read]='\0';
				strncpy(value.st,buf+1,8);
				x22=value.val;
			}
			z2=z22+0.1*x22;
			z22=z2;
			value.val=z22;
			buffer[0]='3';
			strncpy(buffer+1,value.st,8);
			buffer[9]='\0';
			addr.sin_family = AF_INET;
    			addr.sin_port = htons(3426);
    			inet_aton("10.148.0.5",&addr.sin_addr);//10.148.0.254
			sendto(sock, buffer, 9, 0, (struct sockaddr *)&addr, sizeof(addr));
			io[pointer].iov_base=buf_msg[pointer];
			sprintf(buf_msg[pointer],buffer);
			io[pointer].iov_len=strlen(buffer);
			break;
		case 5:
			if(i!=0.3)
			{
				bytes_read = recvfrom(sock, buf, 1024, 0, NULL, NULL);
				buf[bytes_read]='\0';
				strncpy(value.st,buf+1,8);
				z22=value.val;
			}
			x2=x22-0.1*omega2*omega2*z22;
			x22=x2;
			value.val=x22;
			buffer[0]='4';
			strncpy(buffer+1,value.st,8);
			buffer[9]='\0';
			addr.sin_family = AF_INET;
    			addr.sin_port = htons(3426);
    			inet_aton("10.148.0.3",&addr.sin_addr);//10.148.0.254
			sendto(sock, buffer, 9, 0, (struct sockaddr *)&addr, sizeof(addr));
			io[pointer].iov_base=buf_msg[pointer];
			sprintf(buf_msg[pointer],buffer);
			io[pointer].iov_len=strlen(buffer);
			break;
		}

    msg.msg_iov=io;
    msg.msg_iovlen=pointer;
    inet_aton("10.148.0.254",&addr.sin_addr);
    sendmsg(sock,&msg,0);
    close(sock);

    return 0;
}

Сервер:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

union num
{
	double val;
	char st[8];
}value;
double omega1,omega2;
double z1=0,x1=0,z2=0,x2=0;
double points;
//////////////////////////////////////////
struct iovec io[3333];
struct msghdr msg;
char buf_msg[3333][10];
//////////////////////////////////////////
void strst(char *st,int val)
{
	int i;
	for(i=0;i<strlen(st);i++)
		st[i]=val;
}
int parallel()
{
	char msg1[]="nohup ssh node";
	char msg2[]=" 'nohup ./c.exe ";
	char msg3[]="&' > node";
	char msg4[]=".t&";
	char res1[30],res2[80],res3[30];
	char sym[20];
	char o[2];
	int i=0,op=4;
	int fails=0,fl;
	for(i=1;i<=op+fails;i++)
	{
		strst(res2,0);
		strst(res1,0);
		strst(res3,0);
		sprintf(o,"%d",i);
		strcat(res1,"scp c.exe node");
		strcat(res1,o);
		strcat(res1,":");
		fl=system(res1);
		printf("%s\n",res1);
		if(fl==256)
		{	
			fails++;
			strcat(res3,"ssh node");
			strcat(res3,o);
			strcat(res3," 'exit'");
			system(res3);
			goto nd;
		}
		sprintf(sym,"%2.1f %2.1f %2.1f %2.1f %2.1f %d",omega1,x1,omega2,x2,points,i);
		strcat(res2,msg1);
		strcat(res2,o);
		strcat(res2,msg2);
		strcat(res2,sym);
		strcat(res2,msg3);
		strcat(res2,o);
		strcat(res2,msg4);
		system(res2);
		printf("%s\n",res2);
		strcat(res3,"ssh node");
		strcat(res3,o);
		strcat(res3," 'exit'");
		system(res3);
nd:
		printf("%s\n",res3);

	}
	return fails;
}
int main(int argc, char *argv[])
{
    int sock;
    struct sockaddr_in addr;
    char buf[1024];
    int bytes_read;
    FILE *pFile;
    int f=0;
    int i,j;
    double p;
    char msg1[]="Go";
    char fname[20];
	//char msg2[]="End";
///////////////////////////////
	if(argc!=6)
	{
		printf("Error! You must input all parameters!!! Example:\n./p.exe [omega 1] [start value 1] [omega 2] [start value 2] [number of points]\n");
		return 0;
	} 
	z1=0;
	z2=0;
	omega1=atof(argv[1]);
	x1=atof(argv[2]);
	omega2=atof(argv[3]);
	x2=atof(argv[4]);
	points=atof(argv[5]);
////////////////////////////////
    sock = socket(AF_INET, SOCK_DGRAM, 0);
    if(sock < 0)
    {
        perror("socket");
        exit(1);
    }
    
    addr.sin_family = AF_INET;
    addr.sin_port = htons(3426);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    if(bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
    {
        perror("bind");
        exit(2);
    }
    f=parallel();
    printf("OK\n");
    for(j=0;j<points/0.3;j++)
    {
	io[j].iov_base=buf_msg[i];
	io[j].iov_len=sizeof(buf_msg[i]);
    }
/////////////////////////////
    setsockopt(sock,SOL_SOCKET,SO_BROADCAST,(char *)&i,sizeof(i));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(3426);
    inet_aton("10.148.0.255",&addr.sin_addr);
	bytes_read=sendto(sock, msg1, sizeof(msg1), 0,
           (struct sockaddr *)&addr, sizeof(addr));
    if(bytes_read==-1)
    {
    	perror("sendto");
	exit(3);
    }
    printf("Broadcast message was sended!!!\n");
    
///////////////////////
    i=0;
    while(buf[0]!='G' && buf[1]!='o')
        bytes_read = recvfrom(sock, buf, 1024, 0, NULL, NULL);
///////////////////////
	printf("start!\n");
       msg.msg_iov=io;
	msg.msg_iovlen=(int)points/0.3;
	for(i=0;i<=4;i++)
	{
		switch(i)
		{
		case 0:
			msg.msg_name=NULL;
			msg.msg_namelen=0;
			sprintf(fname,"output z1.txt");
			pFile=fopen(fname,"w"); 
			fprintf(pFile,"i\tz1\n");
			fprintf(pFile,"%4.1f\t%.5f\n",i,z1);
			break;
		case 1:
			msg.msg_name=NULL;
			msg.msg_namelen=0;
			sprintf(fname,"output x1.txt");
			pFile=fopen(fname,"w"); 
			fprintf(pFile,"i\tx1\n");
			fprintf(pFile,"%4.1f\t%.5f\n",i,x1);
			break;
		case 2:
			msg.msg_name=NULL;
			msg.msg_namelen=0;
			sprintf(fname,"output z2.txt");
			pFile=fopen(fname,"w"); 
			fprintf(pFile,"i\tz2\n");
			fprintf(pFile,"%4.1f\t%.5f\n",i,z2);
			break;
		case 3:
			msg.msg_name=NULL;
			msg.msg_namelen=0;
			sprintf(fname,"output x2.txt");
			pFile=fopen(fname,"w"); 
			fprintf(pFile,"i\tx2\n");
			fprintf(pFile,"%4.1f\t%.5f\n",i,x2);
			break;
		}	
		recvmsg(sock,&msg,0);
		for(p=0.3,j=0;p<=points;p+=0.3,j++)
		{
			fprintf(pFile,"%4.1f\t%s\n",p,msg.msg_iov[j]);
		}
		fclose(pFile);
	}
	//close(sock);    
    return 0;
}

(Mpi не предлагать)

CORBA, RPC
в общем нефиг велосипедить

EugeneBas ★★ ()

Что это такое?

char buf[80];
//...
while(buf[0]!='G' && buf[1]!='o')
    {
	bytes_read = recvfrom(sock, buf, 1024, 0, NULL, NULL);
    }
Во-первых, чтение неинициализированной переменной buf. Во-вторых, игнорирование bytes_read: после каждого recvfrom нужно передвигаться дальше по buf на bytes_read. Хотя, при успешном получении первого же куска, который начинается с «Go», цикл завершится, даже если сообщение получено не полностью. В-третьих, ты выделил buf'у места всего на 80 байт (включая NULL-терминатор при необходимости), а обещаешь recvfrom, что можно использовать 1024 байта!

gag ★★★★★ ()
Последнее исправление: gag (всего исправлений: 1)
Ответ на: комментарий от Harald

тут UDP, а не TCP

ОК, тогда с достаточным размером буфера надо не промахнуться, иначе данные будут безвозвратно обрезаться. Проверить можно по выставленному флагу MSG_TRUNC.

gag ★★★★★ ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.