Вопрос

I am trying to make a small server in c++ which would simply echo anything it receives through an infiniband connection. I am also using the Sockets Direct Protocol and POSIX sockets under Ubuntu.

Unfortunately, the only advice i found on the internet was about how i should create the socket using AF_INET_SDP domain, just like here:

#define AF_INET_SDP 27

int socketfd = socket(AF_INET_SDP, SOCK_STREAM, 0);

I manage to bind the socket, i call the listen(...) function but the server hangs when trying to accept connections, while on the client side i only receive a timeout when trying to connect.

I also have an older similar application written in Java (both the client and the server) which communicates through infiniband and it works properly.

Can someone give me an example of an application using infiniband or point me towards some sort of documentation which could help me?

Thanks.

Это было полезно?

Решение

This is a simple example to how use infiniband, I hope it can help u:

#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <pthread.h>
#include <time.h>

#define AF_INET_SDP 27


typedef struct
{
    pthread_attr_t threadattr;
    pthread_t threadid;
    int seq;
    int  fd;
    double sendbytes;
    double runtimes;
}CLIENTTHREADPARA;

CLIENTTHREADPARA * g_pClientThreadPara;


void signal_func(int no);
int SetSignal();

int g_listensock;
int g_bIsExit=0;

int g_packetsize;
int g_totalthread;
char * g_szIp;
int g_iPort;
int g_nDelayUsecs;

pthread_mutex_t g_mutex;


typedef struct
{
    pthread_attr_t threadattr;
    pthread_t threadid;
    int fd;
}THREADDATA;

static double time_so_far();
void * ThreadProc(void * pPara);
int RunServ(int iPort);
int RunClient(char * Ip,int iPort);
void * ClientThreadProc(void * pPara);

int main(int argc,char * argv[])
{
    int iPort;
    int thread_nums;

    SetSignal();

    if(argc != 3 && argc!=6)
    {
        goto err_out;
    }

    g_packetsize=atoi(argv[1]);

    if(argc==3)
    {
        iPort=atoi(argv[2]);
        if(iPort==0)
        {
            printf("socket port can not zero\n");
            return -1;
        }
        RunServ(iPort);
    }
    else if(argc==6)
    {
        g_szIp=argv[2];
        g_iPort=atoi(argv[3]);
        g_totalthread=atoi(argv[4]);
        g_nDelayUsecs=atoi(argv[5]);
        RunClient(argv[2],iPort);
    }
    return 0;

err_out:
    printf("For Server: sdptest <packet size> <port>\n");
    printf("For Client: sdptest <packet size> <ip> <port> <thread_nums> <delay microsecond>\n");
    return -1;
}

int RunClient(char * Ip,int iPort)
{
    int i;
    pthread_mutex_init(&g_mutex, (pthread_mutexattr_t *)0);

    g_pClientThreadPara=(CLIENTTHREADPARA *)malloc(sizeof(CLIENTTHREADPARA)*g_totalthread);
    for(i=0;i<g_totalthread;i++)
    {
        g_pClientThreadPara[i].seq=i;
        pthread_attr_init(&g_pClientThreadPara[i].threadattr);
        pthread_create(&g_pClientThreadPara[i].threadid, &g_pClientThreadPara[i].threadattr,ClientThreadProc,(void *)&g_pClientThreadPara[i]);
    }

    double sendbytes = 0;
    double pre_sendbytes=0;
    double pre_time = time_so_far();
    double current_time = time_so_far();

    while(!g_bIsExit)
    {
        sleep(1);
        pthread_mutex_lock(&g_mutex);
        sendbytes = g_pClientThreadPara[0].sendbytes;
        pthread_mutex_unlock(&g_mutex);
        current_time = time_so_far();        
        printf("speed is %10.2fMbytes/s,\n",(sendbytes - pre_sendbytes)/1024/1024/(current_time - pre_time));
        pre_sendbytes = sendbytes;
        pre_time = current_time;
    }

    for(i=0;i<g_totalthread;i++)
    {
        pthread_join(g_pClientThreadPara[i].threadid, NULL);
    }

    double totalspeed=0;
    double totaltimes=0;

    for(i=0;i<g_totalthread;i++)
    {

        totalspeed+=(double)g_pClientThreadPara[i].sendbytes/g_pClientThreadPara[i].runtimes;
    }
    printf("speed is %10.2fMbytes/s,total times is %.2f seconds\n",totalspeed/1024/1024,totaltimes/g_totalthread);
}

void * ClientThreadProc(void * pPara)
{

    CLIENTTHREADPARA * pThreadPara= (CLIENTTHREADPARA *)pPara;

    struct sockaddr_in  sin;
    int nRet;
    double starttime,endtime;
    int cnt_per_usec;
    char * buf;

    memset(&sin,0,sizeof(sin));
    sin.sin_family=AF_INET_SDP;
    sin.sin_port=htons(g_iPort);

    if((sin.sin_addr.s_addr = inet_addr(g_szIp)) == INADDR_NONE)
    {
        printf("Ip address %s is invalid!\n",g_szIp);
        return (void *)-1;
    }

    pThreadPara->fd = socket(AF_INET_SDP,SOCK_STREAM,0);
    if(pThreadPara->fd < 0)
    {
        perror("Create socket error");
        return (void *)-1;
    }

    nRet=connect(pThreadPara->fd,(struct sockaddr *)&sin,sizeof(sin));
    if(nRet<0)
    {

        printf("Can't connect to %s:%d\n",g_szIp,g_iPort);
        perror("sock error:");
        close(pThreadPara->fd);
        return (void *)-1;
    }

    buf=malloc(g_packetsize);
    starttime=time_so_far();

    pThreadPara->sendbytes=0;
    while(!g_bIsExit)
    {
        nRet=send(pThreadPara->fd,buf,g_packetsize,0);
        if(nRet<=0)
        {
            if(errno==EINTR)
            {
                continue;
            }
            else
            {
                printf("thread %d ",pThreadPara->seq);
                perror("sock error:");
                break;
            }
        }
        else
        {
            pthread_mutex_lock(&g_mutex);
            pThreadPara->sendbytes+=nRet;
            pthread_mutex_unlock(&g_mutex);
        }
        if (g_nDelayUsecs>0)
        {
            usleep(g_nDelayUsecs);
        }
    }
    endtime=time_so_far();
    pThreadPara->runtimes=endtime-starttime;
    close(pThreadPara->fd);
    free(buf);
    //printf("speed is %10.2fM/s\n",g_totalbytes/1024/1024/(endtime - starttime));
    return (void *)0;
}

int RunServ(int iPort)
{
    struct sockaddr_in my_addr;
    struct linger li; 
    int nRet;
    int fromclientfd;

    //0.init
    memset ((char *)&my_addr, 0, sizeof(struct sockaddr_in));
    my_addr.sin_family = AF_INET_SDP;
    my_addr.sin_port = htons(iPort);
    my_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    //1.create
    g_listensock = socket(AF_INET_SDP, SOCK_STREAM, 0);
    if (g_listensock==-1)
    {
        return -1;
    }

    li.l_onoff = 1;
    li.l_linger = 0;

    int option=1;
    setsockopt(g_listensock,SOL_SOCKET, SO_REUSEADDR, (char*)&option, sizeof(option));
    setsockopt(g_listensock,SOL_SOCKET, SO_LINGER, (char *) &li, sizeof(li));

    //2.bind
    nRet = bind(g_listensock, (struct sockaddr *)&my_addr, sizeof(struct sockaddr));
    if (nRet==-1)
    {
        printf("bind %d port error\n",iPort);
        return -1;
    }

    //3.listen
    nRet = listen(g_listensock, 10);
    if (nRet==-1)
    {
        printf("listen in port %d error!\n",iPort);
        return -1;
    }

    struct sockaddr_in from;
#if AIX || Linux
        socklen_t fromlen=sizeof(from);
#else
        int fromlen=sizeof(from);
#endif


    li.l_onoff = 1;
    li.l_linger = 3;
    while(!g_bIsExit)
    {
        fromclientfd = accept(g_listensock,(struct sockaddr *)&from,&fromlen);
        if (fromclientfd==-1)
        {
            close(g_listensock);
            printf("server is stoped.\n");
            break;;
        }

        setsockopt(fromclientfd,SOL_SOCKET, SO_LINGER, (char *) &li, sizeof(li));
        THREADDATA * pThreadData=(THREADDATA *)malloc(sizeof(THREADDATA));
        pThreadData->fd=fromclientfd;
        pthread_attr_init(&pThreadData->threadattr);
        pthread_create (&pThreadData->threadid, &pThreadData->threadattr,ThreadProc,(void *)pThreadData);
    }
}

void * ThreadProc(void * pPara)
{
    THREADDATA * pThreadPara;
    pThreadPara=(THREADDATA *)pPara;
    int nRet;
    char * buf;
    double starttime,endtime;
    double totalbytes;

    buf=malloc(g_packetsize);
    if(buf==NULL)
    {
        close(pThreadPara->fd);
        printf("Canot not allocate memory!\n");
        return 0;
    }

    nRet=1;
    starttime=time_so_far();
    totalbytes=0;
    while (nRet>0 && !g_bIsExit)
    {
        nRet=recv(pThreadPara->fd ,buf,g_packetsize,0);
        if (nRet<=0)
        {
            break;
        }
        totalbytes+=nRet;
    }
    endtime=time_so_far();
    free(buf);
    free(pThreadPara);
    printf("speed is %10.2fM/s\n",totalbytes/1024/1024/(endtime - starttime));
    return 0;
}

static double time_so_far()
{
#if defined(SysV)
    int        val;
    struct tms tms;

    if ((val = times(&tms)) == -1)
    {
        printf("Call times() error\n");
    }
    return ((double) val) / ((double) sysconf(_SC_CLK_TCK));

#else

    struct timeval tp;

    if (gettimeofday(&tp, (struct timezone *) NULL) == -1)
    {
        printf("Call gettyimeofday error\n");
    }
    return ((double) (tp.tv_sec)) +
           (((double) tp.tv_usec) / 1000000.0);
#endif
}


int SetSignal()
{
    signal(SIGHUP, signal_func);
    signal(SIGQUIT, signal_func);
    signal(SIGBUS, SIG_DFL);

    signal(SIGURG,signal_func); 

    signal(SIGPIPE,SIG_IGN); 

    signal(SIGABRT,SIG_IGN); 

    signal(SIGTRAP,SIG_IGN);

    signal(SIGILL,signal_func); 
    //signal(SIGSEGV,signal_func); 

    //signal(SIGCHLD,SIG_IGN)

    signal(SIGTERM,signal_func); 
    signal(SIGINT, signal_func); //Ctrl+C

    return 0;
}


void signal_func(int no)
{
    switch (no)
    {
    case 1:
        printf("Receive signal SIGHUP.\n");
        break;
    case SIGINT:
        close(g_listensock);
        //printf("Receive Ctrl+C or signal SIGINT, server is stoping....\n");
        g_bIsExit=1;
        break;
    case SIGTERM:
        close(g_listensock);
        printf("Receive kill signal,server is stoping...\n");
        g_bIsExit=1;
        break;
    case SIGQUIT:
        printf("Receive SIGQUIT signal.\n");
        break;

    case SIGABRT:
        close(g_listensock);
        printf("Receive SIGABRT signal.\n");
        break;

    case SIGILL:
        printf("Receive SIGILL signal.\n");
        break;

    case SIGSEGV:
        close(g_listensock);
        printf("Receive SIGSEGV signal.\n");
        g_bIsExit=1;
        break;

    case SIGPIPE:
        printf("Receive SIGPIPE signal.\n");
        break;

    default:
        printf("Receive %d sigial!\n",no);
        break;
    break;
    }
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top