makuosan.c
86badd32
 /*
c858e9b6
  * makuosan.c
dbdc4e5a
  * Copyright (C) 2008 KLab Inc.
86badd32
  */
 #include "makuosan.h"
 
 void recv_timeout(mfile *m)
 {
5fe10f4f
   mhost   *t;
   uint8_t *r;
fc430629
   if(!m){
     return;
86badd32
   }
fc430629
   do{
     for(t=members;t;t=t->next){
       r = get_hoststate(t, m);
       if(*r == MAKUO_RECVSTATE_NONE){
eeef442d
         member_del_message(1, t, "receive time out");
fc430629
         member_del(t);
         break;
       }
     }
   }while(t); 
69d91074
   m->retrycnt = MAKUO_SEND_RETRYCNT;
86badd32
 }
 
d38d0833
 void pingpong(int n)
86badd32
 {
eeef442d
   mfile *m = mfins(MFSEND);
86badd32
   mping *p = NULL;
634a23d7
   char buff[MAKUO_HOSTNAME_MAX + 1];
86badd32
 
   if(!m){
d38d0833
     lprintf(0, "[error] %s: out of memmory\r\n", __func__);
     return;
86badd32
   }
   m->mdata.head.reqid  = getrid();
   m->mdata.head.seqno  = 0;
   m->mdata.head.szdata = 0;
   m->sendwait          = 0;
   if(gethostname(buff, sizeof(buff)) == -1){
     buff[0] = 0;
   }
   p = (mping *)(m->mdata.data);
   p->hostnamelen = strlen(buff);
c510892b
   p->versionlen  = strlen(PACKAGE_VERSION);
86badd32
   m->mdata.head.szdata = sizeof(mping) + p->hostnamelen + p->versionlen;
   m->mdata.p = p->data;
   memcpy(m->mdata.p, buff, p->hostnamelen);
   m->mdata.p += p->hostnamelen;
c510892b
   memcpy(m->mdata.p, PACKAGE_VERSION, p->versionlen);
86badd32
   m->mdata.p += p->versionlen;
   p->hostnamelen = htons(p->hostnamelen);
   p->versionlen  = htons(p->versionlen);
d38d0833
   gettimeofday(&lastpong, NULL);
   switch(n){
     case 0:
       m->mdata.head.opcode = MAKUO_OP_PING;
       break;
     case 1:
       m->mdata.head.opcode = MAKUO_OP_PING;
       m->mdata.head.flags |= MAKUO_FLAG_ACK;
       break;
     case 2:
       m->mdata.head.opcode = MAKUO_OP_EXIT;
       msend(m);
       break;
   } 
86badd32
 }
 
bd023776
 int mfdirchk(mfile *d){
   mfile *m;
810b92ac
   int len = strlen(d->fn);
bd023776
   if(!len){
     return(1);
   }
   if(d->fn[len - 1] == '/'){
     len--;
   }
17b77ef8
   if(d->mdata.head.flags & MAKUO_FLAG_ACK){
     return(1);
   }
642a6cb2
   for(m=mftop[MFSEND];m;m=m->next){
bd023776
     if(m == d){
       continue;
     }
3b736b7c
     if(m->comm != d->comm){
       continue;
     }
bd023776
     if(strlen(m->fn) < len){
       continue;
     }
     if(!memcmp(d->fn, m->fn, len)){
       if(m->fn[len] == '/'){
         return(0);
       }
     }
   }
   return(1);
 }
 
d38d0833
 int is_send(mfile *m)
86badd32
 {
5fe10f4f
   if(!m){
86badd32
     return(0);
   }
eeef442d
   if(m->mdata.head.nstate == MAKUO_SENDSTATE_WAIT){
     return(0);
   }
   if(m->mdata.head.flags & MAKUO_FLAG_ACK){
facf6c53
     return(1);
eeef442d
   }
d38d0833
   switch(m->mdata.head.opcode){
     case MAKUO_OP_SEND:
     case MAKUO_OP_DEL:
       if(!S_ISLNK(m->fs.st_mode) && S_ISDIR(m->fs.st_mode)){
         if(!mfdirchk(m)){
           return(0);
         }
       }
       break;
86badd32
   }
d38d0833
   if(!ack_check(m, MAKUO_RECVSTATE_NONE)){
86badd32
     m->sendwait = 0;
   }
5fe10f4f
   if(m->sendwait){
     if(!mtimeout(&(m->lastsend), MAKUO_SEND_TIMEOUT)){
d38d0833
       return(0);
5fe10f4f
     }
     if(!(m->retrycnt)){
86badd32
       recv_timeout(m);
     }
   }
5fe10f4f
   return(1);
86badd32
 }
 
21e2082d
 void rfdset(int s, fd_set *fds)
 {
   FD_SET(s, fds);
 }
 
d38d0833
 void wfdset(int s, fd_set *fds)
86badd32
 {
810b92ac
   mfile *m;
d38d0833
   for(m=mftop[MFSEND];m;m=m->next){
     if(is_send(m)){
       FD_SET(s, fds);
       return;
     }
   }
 }
 
21e2082d
 void cfdset(mcomm *c, fd_set *rfds, fd_set *wfds)
 {
   int i;
 
   /*----- listen socket -----*/
   if(moption.lisocket != -1){
     FD_SET(moption.lisocket, rfds);
   }
 
   /*----- connect socket -----*/
   for(i=0;i<MAX_COMM;i++){
     if(c[i].fd[0] != -1){
       FD_SET(c[i].fd[0], rfds);
       if(c[i].working){
         FD_SET(c[i].fd[0], wfds);
       }
     }
     if(c[i].fd[1] != -1){
       FD_SET(c[i].fd[1], rfds);
     }else{
       if(c[i].cpid){
         if(waitpid(c[i].cpid, NULL, WNOHANG) == c[i].cpid){
           c[i].cpid = 0;
         }
       }
     }
   }
 }
 
d38d0833
 int do_select(fd_set *rfds, fd_set *wfds)
 {
86badd32
   struct timeval tv;
d38d0833
   tv.tv_sec  = 1;
   tv.tv_usec = 0;
   if(select(1024, rfds, wfds, NULL, &tv) <= 0){
     gettimeofday(&curtime, NULL);
     moption.sendready = 0;
     return(-1);
   }
   gettimeofday(&curtime, NULL);
   moption.sendready = FD_ISSET(moption.mcsocket, wfds);
   return(0);
 }
fc430629
 
d38d0833
 void do_pong()
 {
   if(mtimeout(&lastpong, MAKUO_PONG_INTERVAL)){
     pingpong(1);
   }
 }
fc430629
 
21e2082d
 void do_free()
 {
   mrecv_gc();
 }
 
d38d0833
 void do_recv()
 {
   while(mrecv());
 }
1ed29a26
 
d38d0833
 void do_send()
 {
   int  i=0;
   mfile *m;
   mfile *n;
1ed29a26
 
d38d0833
   for(m=mftop[MFSEND];m;m=n){
     if(i == moption.parallel){
       return;
     }
     n = m->next;
     if(m->mdata.head.flags & MAKUO_FLAG_ACK){
       msend(m);
       continue;
     }
     if(!is_send(m)){
       if(m->sendwait){
         i++;
810b92ac
       }
d38d0833
       continue;
810b92ac
     }
d38d0833
     msend(m);
     i++;
   }
 }
1ed29a26
 
3bd4a8ab
 void do_exechk(mcomm *c){
   int    i;
   mfile *m;
   for(i=0;i<MAX_COMM;i++){
     if(c[i].working && !c[i].cpid && (c[i].fd[1] == -1)){
       for(m=mftop[MFSEND];m;m=m->next){
         if(m->comm == &c[i]){
           break; /* working */
         }
       }
       if(!m){
         workend(&c[i]);
       }
     }
   }
 }
 
 int do_accept(mcomm *c, fd_set *fds)
 {
   int i;
   int s = moption.lisocket;
   if(s == -1){
     return(0);
   }
   if(!FD_ISSET(s,fds)){
     return(0);
   }
   for(i=0;i<MAX_COMM;i++){
     if(c[i].fd[0] == -1){
       break;
     }
   }
   if(i==MAX_COMM){
     close(accept(s, NULL, 0)); 
     lprintf(0, "[error] %s: can't accept reached in the maximum\n");
     return(1);
   }
   c[i].addrlen = sizeof(c[i].addr);
   c[i].fd[0] = accept(s, (struct sockaddr *)(&c[i].addr), &(c[i].addrlen));
642a6cb2
   lprintf(1, "%s: socket[%d] from %s\n", __func__, i, inet_ntoa(c[i].addr.sin_addr));
3bd4a8ab
   c[i].working = 1;
   return(0);
 }
 
 int do_comexe(mcomm *c, fd_set *fds){
   int i, j;
   mfile *m;
   for(i=0;i<MAX_COMM;i++){
     for(j=0;j<2;j++){
       if(c[i].fd[j] != -1){
         if(FD_ISSET(c[i].fd[j], fds) || c[i].check[j]){
           mexec(&c[i], j);
         }
       }
     }
   }
   return(0);
 }
 
d38d0833
 void mloop()
 {
   fd_set rfds;
   fd_set wfds;
   while(loop_flag){
810b92ac
     FD_ZERO(&rfds);
     FD_ZERO(&wfds);
5e37bc10
     rfdset(moption.mcsocket, &rfds);
d38d0833
     wfdset(moption.mcsocket, &wfds);
21e2082d
     cfdset(moption.comm, &rfds, &wfds);
d38d0833
     if(do_select(&rfds, &wfds)){
       do_pong();
21e2082d
       do_free();
     }else{
       do_pong();
       do_recv();
       do_send();
3bd4a8ab
       do_accept(moption.comm, &rfds);
       do_comexe(moption.comm, &rfds);
642a6cb2
       do_exechk(moption.comm);
fc430629
     }
86badd32
   }
 }
 
 int main(int argc, char *argv[])
 {
   minit(argc,argv);
d38d0833
   pingpong(0);
86badd32
   mloop();
d38d0833
   pingpong(2);
86badd32
   mexit();
   return(0);
 }