/* * makuosan.c * Copyright (C) 2008-2012 KLab Inc. */ #include "makuosan.h" void recv_timeout(mfile *m) { mhost *t; uint8_t *r; if(!m){ return; } do{ for(t=members;t;t=t->next){ r = get_hoststate(t, m); if(*r == MAKUO_RECVSTATE_NONE){ member_del_message(1, t, "receive time out"); member_del(t); break; } } }while(t); m->retrycnt = MAKUO_SEND_RETRYCNT; } void pingpong(int n) { int i; unsigned int s; struct timeval tv; mfile *m = mfins(MFSEND); mping *p = NULL; char buff[MAKUO_HOSTNAME_MAX + 1]; if(!m){ lprintf(0, "[error] %s: out of memmory\r\n", __func__); return; } m->mdata.head.reqid = getrid(); m->mdata.head.seqno = 0; m->mdata.head.szdata = 0; m->sendwait = 0; if(gethostname(buff, sizeof(buff)) == -1){ buff[0] = 0; } p = (mping *)(m->mdata.data); p->hostnamelen = strlen(buff); p->versionlen = strlen(PACKAGE_VERSION); m->mdata.head.szdata = sizeof(mping) + p->hostnamelen + p->versionlen; m->mdata.p = p->data; memcpy(m->mdata.p, buff, p->hostnamelen); m->mdata.p += p->hostnamelen; memcpy(m->mdata.p, PACKAGE_VERSION, p->versionlen); m->mdata.p += p->versionlen; p->hostnamelen = htons(p->hostnamelen); p->versionlen = htons(p->versionlen); gettimeofday(&lastpong, NULL); switch(n){ case 0: for(i=0;buff[i];i++){ s += buff[i]; } gettimeofday(&tv,NULL); s += tv.tv_usec; srand(s); m->mdata.head.opcode = MAKUO_OP_PING; break; case 1: m->mdata.head.opcode = MAKUO_OP_PING; m->mdata.head.flags |= MAKUO_FLAG_ACK; break; case 2: m->mdata.head.opcode = MAKUO_OP_EXIT; msend(m); break; } } int mfdirchk(mfile *d){ mfile *m; int len = strlen(d->fn); if(!len){ return(1); } if(d->fn[len - 1] == '/'){ len--; } if(d->mdata.head.flags & MAKUO_FLAG_ACK){ return(1); } for(m=mftop[MFSEND];m;m=m->next){ if(m == d){ continue; } if(m->comm != d->comm){ continue; } if(strlen(m->fn) < len){ continue; } if(!memcmp(d->fn, m->fn, len)){ if(m->fn[len] == '/'){ return(0); } } } return(1); } int is_send(mfile *m) { if(!m){ return(0); } if(m->mdata.head.nstate == MAKUO_SENDSTATE_WAIT){ return(0); } if(m->mdata.head.flags & MAKUO_FLAG_ACK){ return(1); } switch(m->mdata.head.opcode){ case MAKUO_OP_SEND: case MAKUO_OP_DEL: if(!S_ISLNK(m->fs.st_mode) && S_ISDIR(m->fs.st_mode)){ if(!mfdirchk(m)){ return(0); } } break; } if(!ack_check(m, MAKUO_RECVSTATE_NONE)){ m->sendwait = 0; } if(m->sendwait){ if(!mtimeout(&(m->lastsend), MAKUO_SEND_TIMEOUT)){ return(0); } if(!(m->retrycnt)){ recv_timeout(m); } } return(1); } void rfdset(int s, fd_set *fds) { FD_SET(s, fds); } void wfdset(int s, fd_set *fds) { mfile *m; for(m=mftop[MFSEND];m;m=m->next){ if(is_send(m)){ FD_SET(s, fds); return; } } } void cfdset(mcomm *c, fd_set *rfds, fd_set *wfds) { int i; /*----- listen socket -----*/ if(moption.lisocket != -1){ FD_SET(moption.lisocket, rfds); } /*----- connect socket -----*/ for(i=0;i<MAX_COMM;i++){ if(c[i].fd[0] != -1){ FD_SET(c[i].fd[0], rfds); if(c[i].working){ FD_SET(c[i].fd[0], wfds); } } if(c[i].fd[1] != -1){ FD_SET(c[i].fd[1], rfds); }else{ if(c[i].cpid){ if(waitpid(c[i].cpid, NULL, WNOHANG) == c[i].cpid){ c[i].cpid = 0; } } } } } int do_select(fd_set *rfds, fd_set *wfds) { struct timeval tv; tv.tv_sec = 1; tv.tv_usec = 0; if(select(1024, rfds, wfds, NULL, &tv) <= 0){ gettimeofday(&curtime, NULL); moption.sendready = 0; return(-1); } gettimeofday(&curtime, NULL); moption.sendready = FD_ISSET(moption.mcsocket, wfds); return(0); } void do_pong() { static uint32_t interval = MAKUO_PONG_INTERVAL; if(mtimeout(&lastpong, interval)){ interval = MAKUO_PONG_INTERVAL; interval += ((rand() % 31) - 15) * 1000; pingpong(1); } } void do_free() { mrecv_gc(); } void do_recv() { int c = 0; mhost *t; for(t=members;t;t=t->next){ c++; } while(mrecv()){ if(c){ c--; }else{ break; } } } void do_send() { int i=0; mfile *m; mfile *n; for(m=mftop[MFSEND];m;m=n){ if(i == moption.parallel){ return; } n = m->next; if(m->mdata.head.flags & MAKUO_FLAG_ACK){ msend(m); continue; } if(!is_send(m)){ if(m->sendwait){ i++; } continue; } msend(m); i++; } } int do_accept(mcomm *c, fd_set *fds) { int i; int s = moption.lisocket; struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); char buff[1024]; if(s == -1){ return(0); } if(!FD_ISSET(s,fds)){ return(0); } for(i=0;i<MAX_COMM;i++){ if(c[i].fd[0] == -1){ break; } } if(i==MAX_COMM){ close(accept(s, NULL, 0)); lprintf(0, "[error] %s: can't accept reached in the maximum\n"); return(1); } c[i].fd[0] = accept(s, (struct sockaddr *)&ss, &sslen); switch(ss.ss_family){ case AF_UNIX: lprintf(5, "%s: socket=%d from %s\n", __func__, i, moption.uaddr.sun_path); break; case AF_INET: case AF_INET6: buff[0]=0; getnameinfo((const struct sockaddr *)&ss, sizeof(ss), buff, sizeof(buff), NULL, 0, NI_NUMERICHOST); lprintf(5, "%s: socket=%d from %s\n", __func__, i, buff); break; } c[i].working = 1; if(fcntl(c[i].fd[0], F_SETFL , O_NONBLOCK)){ lprintf(0, "[error] %s: fcntl: %s\n", __func__, strerror(errno)); } return(0); } int do_comexe(mcomm *c, fd_set *fds){ int i, j; for(i=0;i<MAX_COMM;i++){ for(j=0;j<2;j++){ if(c[i].fd[j] != -1){ if(FD_ISSET(c[i].fd[j], fds) || c[i].check[j]){ mexec(&c[i], j); } } } } return(0); } void do_exechk(mcomm *c){ int i; mfile *m; for(i=0;i<MAX_COMM;i++){ if(c[i].working){ if(c[i].isalive){ if(mtimeout(&(c[i].tv), (uint32_t)1000)){ cprintf(0, &(c[i]), "alive\n"); mtimeget(&(c[i].tv)); } } if((c[i].cpid == 0) && (c[i].fd[1] == -1)){ for(m=mftop[MFSEND];m;m=m->next){ if(m->comm == &c[i]){ break; /* working */ } } if(!m){ workend(&c[i]); } } } } } void mloop() { fd_set rfds; fd_set wfds; while(loop_flag){ FD_ZERO(&rfds); FD_ZERO(&wfds); rfdset(moption.mcsocket, &rfds); wfdset(moption.mcsocket, &wfds); cfdset(moption.comm, &rfds, &wfds); if(do_select(&rfds, &wfds)){ do_pong(); do_free(); }else{ do_pong(); do_recv(); do_send(); do_accept(moption.comm, &rfds); do_comexe(moption.comm, &rfds); do_exechk(moption.comm); } if(log_level != moption.loglevel){ lprintf(0, "%s: loglevel change %d to %d\n", __func__, moption.loglevel, log_level); moption.loglevel = log_level; } } } int main(int argc, char *argv[]) { minit(argc,argv); pingpong(0); mloop(); pingpong(2); mexit(); return(0); }