mrecv.c
c858e9b6
 /*
  * mrecv.c
83cbbef8
  * Copyright (C) 2008-2012 KLab Inc.
c858e9b6
  */
86badd32
 #include "makuosan.h"
83cbbef8
 #include <openssl/md5.h>
86badd32
 
634a23d7
 static void mrecv_req(mdata *data, struct sockaddr_in *addr);
 static void mrecv_ack(mdata *data, struct sockaddr_in *addr);
 
 /******************************************************************
 *
 * Receive common functions (private)
 *
 *******************************************************************/
86badd32
 static mfile *mrecv_mfdel(mfile *m)
 {
   mfile *r;
f31a5410
   if(!m){
86badd32
     return(NULL);
f31a5410
   }
86badd32
   r = m->next;
   if(m->fd != -1){
     close(m->fd);
     m->fd = -1;
1ed29a26
     if(!S_ISLNK(m->fs.st_mode) && S_ISREG(m->fs.st_mode)){
86badd32
       mremove(moption.base_dir, m->tn);
abce546a
     }
86badd32
   }
3b736b7c
   if(m->link){
     m->link->link = NULL;
     m->link = NULL;
   }
1b4c64d4
   while((m->mark = delmark(m->mark)));
86badd32
   mfdel(m);
   return(r);
 }
 
634a23d7
 static int mrecv_decrypt(mdata *data, struct sockaddr_in *addr)
 {
   int i;
   MD5_CTX ctx;
   uint8_t hash[16];
 
   if(data->head.flags & MAKUO_FLAG_CRYPT){
     if(!moption.cryptena){
d6e8005b
       lprintf(0, "%s: [warn] encrypt packet from %s. I have not key!\n", __func__, inet_ntoa(addr->sin_addr));
634a23d7
       return(-1);
     }
     if(data->head.szdata){
       for(i=0;i<data->head.szdata;i+=8){
         BF_decrypt((BF_LONG *)(data->data + i), &EncKey);
       }
       MD5_Init(&ctx);
       MD5_Update(&ctx, data->data, data->head.szdata);
       MD5_Final(hash, &ctx);
       if(memcmp(hash,data->head.hash,16)){
d6e8005b
         lprintf(0, "[error] %s: protocol checksum error from %s\n", __func__, inet_ntoa(addr->sin_addr));
634a23d7
         return(-1);
       }
     }
7284b461
   }else{
     if(moption.cryptena){
d6e8005b
       lprintf(0, "%s: [warn] not encrypt packet from %s. I have key!\n", __func__, inet_ntoa(addr->sin_addr));
7284b461
       return(-1);
     }
634a23d7
   }
   return(0);
 }
 
 static int mrecv_packet(int s, mdata *data, struct sockaddr_in *addr)
 {
   int recvsize;
   socklen_t addr_len;
 
   while(1){
1ed29a26
     if(!loop_flag){
       return(-1);
     }
634a23d7
     addr_len = sizeof(struct sockaddr_in);
     recvsize = recvfrom(s, data, sizeof(mdata), 0, (struct sockaddr *)addr, &addr_len);
1ed29a26
     if(recvsize == -1){
810b92ac
       if(errno == EAGAIN){
         return(-1);
       }
       if(errno == EINTR){
634a23d7
         continue;
       }else{
d6e8005b
         lprintf(0, "[error] %s: %s recv error\n", __func__, strerror(errno));
634a23d7
         return(-1);
       }
     }
1ed29a26
     if(recvsize < sizeof(data->head)){
d6e8005b
       lprintf(0, "[error] %s: recv head size error from %s\n", __func__, inet_ntoa(addr->sin_addr));
1ed29a26
       return(-1);
     }
     data->head.szdata = ntohs(data->head.szdata);
     data->head.flags  = ntohs(data->head.flags);
     data->head.reqid  = ntohl(data->head.reqid);
     data->head.seqno  = ntohl(data->head.seqno);
36959136
     data->head.maddr  = data->head.maddr;
     data->head.mport  = data->head.mport;
1ed29a26
     data->head.error  = ntohl(data->head.error);
     if(data->head.maddr != moption.maddr.sin_addr.s_addr){
       continue; /* other group packet */
     }
     if(data->head.mport != moption.maddr.sin_port){
       continue; /* other group packet */
     }
     if(data->head.vproto != PROTOCOL_VERSION){
       continue; /* other protocol */
     }
     if(!mrecv_decrypt(data, addr)){
       break;
     }
634a23d7
   }
1ed29a26
   return(0);
634a23d7
 }
 
 /******************************************************************
 *
8f9aeac1
 * ack receive functions (for source node tasks)
634a23d7
 *
 *******************************************************************/
8f9aeac1
 static int mrecv_ack_search(mhost **lpt, mfile **lpm, mdata *data, struct sockaddr_in *addr)
634a23d7
 {
   mhost *t;
   mfile *m;
   *lpt = NULL;
   *lpm = NULL;
   t = member_add(&addr->sin_addr, NULL);
   if(!t){
8f9aeac1
     lprintf(0, "%s: member not found %s\n", __func__, inet_ntoa(addr->sin_addr));
     return(-1);
634a23d7
   }
1029a28c
   for(m=mftop[0];m;m=m->next){
     if(m->mdata.head.reqid == data->head.reqid){
634a23d7
       break;
1029a28c
     }
   }
634a23d7
   if(!m){
8f9aeac1
     return(-1);
634a23d7
   }
   *lpt = t;
   *lpm = m;
aa2b6bc8
   mtimeget(&m->lastrecv);
8f9aeac1
   return(0);
634a23d7
 }
 
69b027c3
 static void mrecv_ack_report(mfile *m, mhost *t, mdata *data)
634a23d7
 {
   if(data->head.nstate == MAKUO_RECVSTATE_OPENERROR){
69b027c3
     cprintf(0, m->comm, "error: %s %s:%s\n", strerror(data->head.error), t->hostname, m->fn);
d6e8005b
     lprintf(0, "[error] %s: %s rid=%06d %s %s:%s\n", 
f31a5410
       __func__,
       strerror(data->head.error),
       data->head.reqid, 
       strrstate(data->head.nstate), 
69b027c3
       t->hostname, 
f31a5410
       m->fn);
634a23d7
   }
   if(data->head.nstate == MAKUO_RECVSTATE_WRITEERROR){
69b027c3
     cprintf(0, m->comm, "error: %s %s:%s\n", strerror(data->head.error), t->hostname, m->fn);
d6e8005b
     lprintf(0, "[error] %s: %s rid=%06d %s %s:%s\n", 
f31a5410
       __func__,
       strerror(data->head.error),
       data->head.reqid, 
       strrstate(data->head.nstate), 
69b027c3
       t->hostname, 
f31a5410
       m->fn);
634a23d7
   }
   if(data->head.nstate == MAKUO_RECVSTATE_CLOSEERROR){
69b027c3
     cprintf(0, m->comm, "error: close error %s:%s\n", t->hostname, m->fn);
d6e8005b
     lprintf(0, "[error] %s: close error rid=%06d %s %s:%s\n", 
f31a5410
       __func__,
       data->head.reqid, 
       strrstate(data->head.nstate), 
69b027c3
       t->hostname, 
f31a5410
       m->fn);
634a23d7
   }
 }
 
 static void mrecv_ack_ping(mdata *data, struct sockaddr_in *addr)
 {
   member_add(&addr->sin_addr, data);
 }
 
481bf811
 static void mrecv_ack_send_mark(mdata *data, mfile *m, mhost *t)
 {
f31a5410
   uint32_t l;
   uint32_t h;
   data->p = data->data;
   while(!data_safeget32(data, &l)){
ba2caa89
     if(data_safeget32(data, &h)){
       break;
     }
f31a5410
     seq_setmark(m, l, h);
481bf811
   }
 }
 
abce546a
 static void mrecv_ack_send(mdata *data, struct sockaddr_in *addr)
634a23d7
 {
aa2b6bc8
   mhost *t;
   mfile *m;
8f9aeac1
   if(mrecv_ack_search(&t, &m, data, addr)){
     return;
   }
5fe10f4f
   if(data->head.nstate == MAKUO_RECVSTATE_MARK){
ba2caa89
     if(m->mdata.head.nstate == MAKUO_SENDSTATE_MARK){
       mrecv_ack_send_mark(data, m, t);
634a23d7
     }
   }
481bf811
   if(data->head.nstate == MAKUO_RECVSTATE_OPEN){
ba2caa89
     if(m->mdata.head.nstate == MAKUO_SENDSTATE_DATA){
       mrecv_ack_send_mark(data, m, t);
481bf811
       return;
abce546a
     }
   }
aa2b6bc8
   if(!set_hoststate(t, m, data->head.nstate)){
facf6c53
     lprintf(0, "[error] %s: host state error\n", __func__);
481bf811
   }
634a23d7
   mrecv_ack_report(m, t, data);
 }
 
 static void mrecv_ack_md5(mdata *data, struct sockaddr_in *addr)
 {
abce546a
   uint8_t *s;
   mhost   *t;
   mfile   *m;
aa2b6bc8
   if(mrecv_ack_search(&t, &m, data, addr)){
634a23d7
     return;
   }
abce546a
   s = get_hoststate(t,m);
   if(!s){
     lprintf(0, "%s: not allocate state area\n", __func__);
     return;
   }
   if(*s != data->head.nstate){
634a23d7
     if(data->head.nstate == MAKUO_RECVSTATE_MD5OK){
       cprintf(1, m->comm, "%s: OK %s\r\n", t->hostname, m->fn);
8f5ac56e
       lprintf(8,          "%s: OK %s:%s\n", __func__, t->hostname, m->fn);
634a23d7
     }
     if(data->head.nstate == MAKUO_RECVSTATE_MD5NG){
       cprintf(0, m->comm, "%s: NG %s\r\n", t->hostname, m->fn);
abce546a
       lprintf(0,          "%s: NG %s:%s\n", __func__, t->hostname, m->fn);
634a23d7
     }
   }
abce546a
   *s = data->head.nstate;
634a23d7
   mrecv_ack_report(m, t, data);
 }
 
3eaafa57
 static void mrecv_ack_dsync(mdata *data, struct sockaddr_in *addr)
 {
aa2b6bc8
   mhost *t;
   mfile *m;
874955ec
 
aa2b6bc8
   if(mrecv_ack_search(&t, &m, data, addr)){
3eaafa57
     return;
   }
facf6c53
   for(m=mftop[MFSEND];m;m=m->next){
bd023776
     if(m->mdata.head.reqid == data->head.reqid){
       if(m->comm){
         break;
       }
       if(m->mdata.head.nstate == MAKUO_SENDSTATE_BREAK){
         break;
       }
aa2b6bc8
     }
   }
   if(!m){
     return;
   }
8e09e847
 
   if(data->head.nstate == MAKUO_RECVSTATE_OPEN){
     if(m->mdata.head.nstate == MAKUO_SENDSTATE_DATA){
       if(data->head.seqno == m->mdata.head.seqno){
         if(!set_hoststate(t, m, data->head.nstate)){
facf6c53
           lprintf(0, "[error] %s: not allocate state area\n", __func__);
8e09e847
         }
       }
facf6c53
       return;
8e09e847
     }
   }
e42e0b01
   if(!set_hoststate(t, m, data->head.nstate)){
facf6c53
     lprintf(0, "[error] %s: not allocate state area\n", __func__);
3eaafa57
   }
874955ec
 }
 
 static void mrecv_ack_del(mdata *data, struct sockaddr_in *addr)
 {
806dbe3b
   mhost     *t;
   mfile     *m;
   uint32_t err;
686a33f7
   uint32_t res;
806dbe3b
   uint16_t len;
442bde46
   char path[PATH_MAX];
806dbe3b
 
aa2b6bc8
   if(mrecv_ack_search(&t, &m, data, addr)){
874955ec
     return;
   }
aa2b6bc8
   if(!set_hoststate(t, m, data->head.nstate)){
1a2a228f
     lprintf(0, "%s: not allocate state area\n", __func__);
     return;
   }
13671329
   if(m->mdata.head.nstate == MAKUO_SENDSTATE_OPEN){
1a2a228f
     m->initstate = 1;
     m->sendwait  = 0;
13671329
     m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
686a33f7
     if(data->head.nstate == MAKUO_RECVSTATE_OPEN){
       data->p=data->data;
       m->mdata.p=m->mdata.data;
       m->mdata.head.szdata = 0;
442bde46
       while(!data_safeget16(data, &len)){
686a33f7
         m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
442bde46
         data_safeget32(data, &res);
686a33f7
         len -= sizeof(uint32_t);
442bde46
         data_safeget(data, path, len);
686a33f7
         path[len] =  0;
 
         err = 0;
         if(m->dryrun){
442bde46
           lprintf(1, "%s: (dryrun) delete %s\n", __func__, path);
806dbe3b
         }else{
442bde46
           if(!mremove(NULL,path)){
             lprintf(1, "%s: delete %s\n", __func__, path);
686a33f7
           }else{
             err = errno;
442bde46
             lprintf(0, "%s: delete error %s (%s)\n", __func__, path, strerror(errno));
686a33f7
           }
806dbe3b
         }
442bde46
         data_safeset16(&(m->mdata), len + sizeof(uint32_t));
         data_safeset32(&(m->mdata), err);
         data_safeset(&(m->mdata), path, len);
806dbe3b
       }
     }
3eaafa57
   }
 }
 
634a23d7
 static void mrecv_ack(mdata *data, struct sockaddr_in *addr)
 {
   switch(data->head.opcode){
     case MAKUO_OP_PING:
aa2b6bc8
       mrecv_ack_ping(data,  addr);
634a23d7
       break;
8f9aeac1
     case MAKUO_OP_SEND:
aa2b6bc8
       mrecv_ack_send(data,  addr);
634a23d7
       break;
     case MAKUO_OP_MD5:
aa2b6bc8
       mrecv_ack_md5(data,   addr);
634a23d7
       break;
3eaafa57
     case MAKUO_OP_DSYNC:
       mrecv_ack_dsync(data, addr);
       break;
874955ec
     case MAKUO_OP_DEL:
aa2b6bc8
       mrecv_ack_del(data,   addr);
874955ec
       break;
8f9aeac1
     /* 機能追加はここへ */
634a23d7
   }
 }
 
 /******************************************************************
 *
8f9aeac1
 * Request receive functions (for destination node tasks)
634a23d7
 *
 *******************************************************************/
874955ec
 static mfile *mrecv_req_search(mdata *data, struct sockaddr_in *addr)
 {
eeef442d
   mfile *m = mftop[MFRECV];
874955ec
   while(m){
     if(!memcmp(&m->addr, addr, sizeof(m->addr)) && m->mdata.head.reqid == data->head.reqid){
       break;
     }
     m = m->next;
   }
   return(m);
 }
 
634a23d7
 static void mrecv_req_ping(mdata *data, struct sockaddr_in *addr)
86badd32
 {
   mping *p;
874955ec
   mfile *a;
634a23d7
   char buff[MAKUO_HOSTNAME_MAX + 1];
86badd32
   member_add(&addr->sin_addr, data);
806dbe3b
   a = mkack(data, addr, MAKUO_RECVSTATE_NONE);
86badd32
   if(gethostname(buff, sizeof(buff)) == -1){
     buff[0] = 0;
   }
874955ec
   p = (mping *)(a->mdata.data);
86badd32
   p->hostnamelen = strlen(buff);
c510892b
   p->versionlen  = strlen(PACKAGE_VERSION);
874955ec
   a->mdata.head.szdata = sizeof(mping) + p->hostnamelen + p->versionlen;
   a->mdata.p = p->data;
   memcpy(a->mdata.p, buff, p->hostnamelen);
   a->mdata.p += p->hostnamelen;
   memcpy(a->mdata.p, PACKAGE_VERSION, p->versionlen);
   a->mdata.p += p->versionlen;
86badd32
   p->hostnamelen = htons(p->hostnamelen);
   p->versionlen  = htons(p->versionlen);
eeef442d
   msend(a);
86badd32
 }
 
634a23d7
 static void mrecv_req_exit(mdata *data, struct sockaddr_in *addr)
fc430629
 { 
aa2b6bc8
   mhost *t = member_get(&(addr->sin_addr));
fc430629
   if(!t){
     return;
   }
eeef442d
   member_del_message(0, t, "member exit");
8f9aeac1
   member_del(t);
86badd32
 }
 
abce546a
 static void mrecv_req_send_break(mfile *m, mdata *r)
86badd32
 {
d6e8005b
   msend(mkack(r, &(m->addr), MAKUO_RECVSTATE_IGNORE));
abce546a
   mrecv_mfdel(m);
 }
86badd32
 
abce546a
 static void mrecv_req_send_stat(mfile *m, mdata *r)
 {
   struct stat fs;
 
   if(moption.dontrecv){
     m->mdata.head.nstate = MAKUO_RECVSTATE_READONLY;
86badd32
   }else{
7ec57664
     if(r->head.flags & MAKUO_FLAG_SYNC){
       if(m->mdata.head.nstate == MAKUO_RECVSTATE_NONE){
         m->mdata.head.nstate = MAKUO_RECVSTATE_DELETEOK;
         if(r->head.flags & MAKUO_FLAG_DRYRUN){
           if(lstat(m->fn, &fs) == -1 && errno == ENOENT){
             m->mdata.head.nstate = MAKUO_RECVSTATE_DELETENG;
           }
         }else{
           if(mremove(NULL, m->fn) == -1){
             m->mdata.head.nstate = MAKUO_RECVSTATE_DELETENG;
           }
         }
       }
86badd32
     }else{
7ec57664
       if(S_ISLNK(m->fs.st_mode)){
         m->mdata.head.nstate = linkcmp(m);
abce546a
       }else{
7ec57664
         if(lstat(m->fn, &fs) == -1){
           m->mdata.head.nstate = MAKUO_RECVSTATE_UPDATE;
         }else{
           m->mdata.head.nstate = statcmp(&(m->fs), &fs);
         }
86badd32
       }
     }
   }
d6e8005b
   msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
86badd32
 }
 
abce546a
 static void mrecv_req_send_open(mfile *m, mdata *r)
86badd32
 {
   char fpath[PATH_MAX];
   char tpath[PATH_MAX];
 
69b027c3
   if(m->mdata.head.nstate != MAKUO_RECVSTATE_UPDATE){
4d9dbfce
     if(m->mdata.head.ostate == MAKUO_RECVSTATE_UPDATE){
       msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     }
91adf830
     return;
69b027c3
   }
86badd32
 
e5b6322e
   mtempname(moption.base_dir, m->fn, m->tn);
86badd32
   sprintf(fpath, "%s/%s", moption.base_dir, m->fn);
   sprintf(tpath, "%s/%s", moption.base_dir, m->tn);
abce546a
   m->mdata.head.ostate = m->mdata.head.nstate;
57179b09
   m->mdata.head.nstate = MAKUO_RECVSTATE_OPENERROR;
e5b6322e
 
86badd32
   if(S_ISLNK(m->fs.st_mode)){
abce546a
     if(!mcreatelink(moption.base_dir, m->tn, m->ln)){
57179b09
       m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
e5b6322e
       set_filestat(tpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode);
86badd32
     }else{
abce546a
       lprintf(0, "%s: symlink error %s -> %s\n", __func__, m->ln, m->fn);
f31a5410
       m->mdata.head.error  = errno;
86badd32
     }
   }else{
     if(S_ISDIR(m->fs.st_mode)){
       if(!is_dir(fpath)){
57179b09
         mcreatedir(moption.base_dir, m->fn, m->fs.st_mode);
         mkdir(fpath, m->fs.st_mode);
86badd32
       }
57179b09
       if(is_dir(fpath)){
         m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
e5b6322e
         set_filestat(fpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode);
57179b09
       }else{
8f9aeac1
         lprintf(0,"%s: mkdir error %s\n", __func__, m->fn);
f31a5410
         m->mdata.head.error = errno;
86badd32
       }
     }
     if(S_ISREG(m->fs.st_mode)){
abce546a
       m->fd = mcreatefile(moption.base_dir, m->tn, m->fs.st_mode);
57179b09
       if(m->fd != -1){
         m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
       }else{
69b027c3
         lprintf(0, "%s: %s %s\n", __func__, strerror(errno), m->fn);
f31a5410
         m->mdata.head.error = errno;
57179b09
       }
     }
     if(S_ISCHR(m->fs.st_mode)){
       if(!mcreatenode(moption.base_dir, m->tn, m->fs.st_mode, m->fs.st_rdev)){
         m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
e5b6322e
         set_filestat(tpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode);
57179b09
       }else{
69b027c3
         lprintf(0, "%s: %s %s\n", __func__, strerror(errno), m->fn);
f31a5410
         m->mdata.head.error = errno;
57179b09
       }
     }
     if(S_ISBLK(m->fs.st_mode)){
       if(!mcreatenode(moption.base_dir, m->tn, m->fs.st_mode, m->fs.st_rdev)){
         m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
e5b6322e
         set_filestat(tpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode);
57179b09
       }else{
69b027c3
         lprintf(0, "%s: %s %s\n", __func__, strerror(errno), m->fn);
f31a5410
         m->mdata.head.error = errno;
57179b09
       }
     }
     if(S_ISFIFO(m->fs.st_mode)){
       if(!mcreatenode(moption.base_dir, m->tn, m->fs.st_mode, m->fs.st_rdev)){
         m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
e5b6322e
         set_filestat(tpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode);
57179b09
       }else{
69b027c3
         lprintf(0, "%s: %s %s\n", __func__, strerror(errno), m->fn);
f31a5410
         m->mdata.head.error = errno;
86badd32
       }
     }
   }
d6e8005b
   msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
abce546a
 }
91adf830
 
481bf811
 static void mrecv_req_send_mark(mfile *m, mdata *r)
abce546a
 {
ba2caa89
   mmark *mm;
   mfile  *a;
 
f31a5410
   if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){
abce546a
     return;
f31a5410
   }
481bf811
   if(m->mdata.head.seqno < m->seqnomax){
     seq_addmark(m, m->mdata.head.seqno, m->seqnomax);
     m->mdata.head.seqno = m->seqnomax;
   }
   m->lickflag = 1;
1ed29a26
   a = mkack(&(m->mdata),&(m->addr),MAKUO_RECVSTATE_MARK);
   if(!a){
7680186f
     lprintf(0, "[error] %s: out of momory\n", __func__);
1ed29a26
     return;
   }
   if(a->mdata.head.szdata){
7680186f
     msend(a);
1ed29a26
     return;
   }
f31a5410
   for(mm=m->mark;mm;mm=mm->next){
ba2caa89
     if(data_safeset32(&(a->mdata), mm->l)){
       break;
     }
     if(data_safeset32(&(a->mdata), mm->h)){
       a->mdata.head.szdata -= sizeof(uint32_t);
1ed29a26
       break;
ba2caa89
     }
86badd32
   }
d6e8005b
   msend(a);
481bf811
 }
abce546a
 
481bf811
 static void mrecv_req_send_data_write(mfile *m, mdata *r)
86badd32
 {
ba2caa89
   off_t offset;
481bf811
   if(r->head.szdata == 0){
91adf830
     return;
481bf811
   }
7680186f
   offset  = r->head.seqno;
ba2caa89
   offset *= MAKUO_BUFFER_SIZE;
   if(lseek(m->fd, offset, SEEK_SET) == -1){
7680186f
     m->mdata.head.error  = errno;
481bf811
     m->mdata.head.ostate = m->mdata.head.nstate;
     m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR;
eeef442d
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
7680186f
     lprintf(0, "[error] %s: seek error (%s) seq=%u\n",
       __func__,
       strerror(m->mdata.head.error), 
       (int)(r->head.seqno));
     return; /* lseek error */
481bf811
   }
7680186f
   if(write(m->fd, r->data, r->head.szdata) == -1){
     m->mdata.head.error  = errno;
     m->mdata.head.ostate = m->mdata.head.nstate;
     m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR;
eeef442d
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
7680186f
     lprintf(0, "[error] %s: write error (%s) seqno=%d size=%d %s\n",
43a6533f
       __func__,
7680186f
       strerror(m->mdata.head.error), 
43a6533f
       (int)(r->head.seqno), 
       r->head.szdata,
       m->fn);
7680186f
     return; /* write error */
481bf811
   }
7680186f
   m->recvcount++;
481bf811
 }
86badd32
 
481bf811
 static void mrecv_req_send_data_retry(mfile *m, mdata *r)
 {
f31a5410
   mmark *mm;
1ed29a26
   mfile *a = mkack(&(m->mdata), &(m->addr), MAKUO_RECVSTATE_OPEN);
   if(!a){
     lprintf(0, "%s: out of momory\n", __func__);
     return;
   }
eeef442d
   if(a->mdata.head.szdata == 0){
     data_safeset32(&(a->mdata), m->mdata.head.seqno);
     data_safeset32(&(a->mdata), r->head.seqno);
     for(mm=m->mark;mm;mm=mm->next){
       if(data_safeset32(&(a->mdata), mm->l)){
         break;
       }
       if(data_safeset32(&(a->mdata), mm->h)){
         a->mdata.head.szdata -= sizeof(uint32_t);
         break;
       }
ba2caa89
     }
481bf811
   }
d6e8005b
   msend(a);
481bf811
 }
 
 static void mrecv_req_send_data(mfile *m, mdata *r)
 {
   if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){
     return;
   }
   if(m->lickflag){
     if(seq_delmark(m, r->head.seqno)){
       mrecv_req_send_data_write(m, r);
     }
     return;
   }
   if(m->mdata.head.seqno > r->head.seqno){
     if(seq_delmark(m, r->head.seqno)){
       mrecv_req_send_data_write(m, r);
     }
     return;
   }
   if(m->mdata.head.seqno < r->head.seqno){
f31a5410
     mrecv_req_send_data_retry(m, r);
ba2caa89
     seq_addmark(m, m->mdata.head.seqno, r->head.seqno);
481bf811
     m->mdata.head.seqno = r->head.seqno;
86badd32
   }
481bf811
   mrecv_req_send_data_write(m, r);
   m->mdata.head.seqno++;
86badd32
 }
 
abce546a
 static void mrecv_req_send_close(mfile *m, mdata *r)
86badd32
 {
abce546a
   struct stat fs;
91adf830
   struct utimbuf mftime;
f10a7701
   char   path[PATH_MAX];
91adf830
 
f10a7701
   if(m->mdata.head.nstate == MAKUO_RECVSTATE_CLOSE || 
      m->mdata.head.nstate == MAKUO_RECVSTATE_CLOSEERROR){
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     return;
   }
 
7680186f
   m->mdata.head.ostate = m->mdata.head.nstate;
f10a7701
   if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){
7680186f
     m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
f10a7701
     return;
   }
7680186f
 
f10a7701
   mftime.actime  = m->fs.st_ctime; 
   mftime.modtime = m->fs.st_mtime;
7680186f
  
f10a7701
   if(S_ISLNK(m->fs.st_mode)){
     if(!mrename(moption.base_dir, m->tn, m->fn)){
       m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
       msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
abce546a
     }else{
f10a7701
       m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR;
       msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
facf6c53
       lprintf(0, "[error] %s: close error %s -> %s\n", __func__, m->ln, m->fn);
f10a7701
       mremove(moption.base_dir, m->tn);
91adf830
     }
f10a7701
     return;
   }
 
   if(S_ISDIR(m->fs.st_mode)){
     m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     sprintf(path, "%s/%s", moption.base_dir, m->fn);
     utime(path, &mftime);
     return;
86badd32
   }
d93eefa2
 
f10a7701
   if(!S_ISREG(m->fs.st_mode)){
     if(!mrename(moption.base_dir, m->tn, m->fn)){
5f0d41cd
       m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
f10a7701
       msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
       sprintf(path, "%s/%s", moption.base_dir, m->tn);
       utime(path, &mftime);
     }else{
       m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR;
       msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
       lprintf(0, "[error] %s: close error %s (can't rename)\n", __func__, m->fn);
       mremove(moption.base_dir, m->tn);
     }
     return;
5f0d41cd
   }
 
f10a7701
   if(m->fd == -1){
     m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR;
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     lprintf(0, "[error] %s: bat discriptor close error %s\n", __func__, m->fn);
     mremove(moption.base_dir, m->tn);
     return;
   }
 
   if(fstat(m->fd, &fs) == -1){
     m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR;
facf6c53
     m->mdata.head.error  = errno;
f10a7701
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     lprintf(0, "[error] %s: %s fstat error %s\n", __func__, strerror(errno), m->fn);
     return; 
   }
 
   if(close(m->fd) == -1){
     m->fd = -1;
     m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR;
facf6c53
     m->mdata.head.error  = errno;
f10a7701
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     lprintf(0, "[error] %s: %s close error %s\n", __func__, strerror(errno), m->fn);
     return; 
   }
 
   m->fd = -1;
   if(fs.st_size != m->fs.st_size){
     m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR;
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     lprintf(0, "[error] %s: close error %s (file size mismatch %d != %d)\n", __func__, m->fn, (int)(fs.st_size), (int)(m->fs.st_size));
     lprintf(0, "[error] %s: seq=%d max=%d mark=%d recv=%d\n", __func__, m->mdata.head.seqno, m->seqnomax, m->markcount, m->recvcount);
     mremove(moption.base_dir, m->tn);
     return;
   }
 
   if(!mrename(moption.base_dir, m->tn, m->fn)){
     m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     sprintf(path, "%s/%s", moption.base_dir, m->fn);
     set_filestat(path, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode);
     utime(path, &mftime);
   }else{
     m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR;
     msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
     lprintf(0, "%s: close error %s (can't rename)\n", __func__, m->fn);
     mremove(moption.base_dir, m->tn);
   }
91adf830
 }
 
abce546a
 static void mrecv_req_send_last(mfile *m, mdata *r)
 {
d6e8005b
   msend(mkack(r, &(m->addr), MAKUO_RECVSTATE_LAST));
abce546a
   mrecv_mfdel(m);
 }
 
 static void mrecv_req_send_next(mfile *m, mdata *r)
91adf830
 {
86badd32
   switch(r->head.nstate){
91adf830
     case MAKUO_SENDSTATE_STAT:
abce546a
       mrecv_req_send_stat(m, r);
91adf830
       break;
 
86badd32
     case MAKUO_SENDSTATE_OPEN:
abce546a
       mrecv_req_send_open(m, r);
91adf830
       break;
 
     case MAKUO_SENDSTATE_DATA:
abce546a
       mrecv_req_send_data(m, r);
91adf830
       break;
 
86badd32
     case MAKUO_SENDSTATE_MARK:
abce546a
       mrecv_req_send_mark(m, r);
91adf830
       break;
 
86badd32
     case MAKUO_SENDSTATE_CLOSE:
abce546a
       mrecv_req_send_close(m, r);
91adf830
       break;
 
     case MAKUO_SENDSTATE_LAST:
abce546a
       mrecv_req_send_last(m, r);
91adf830
       break;
 
     case MAKUO_SENDSTATE_BREAK:
abce546a
       mrecv_req_send_break(m, r);
91adf830
       break;
86badd32
   }
 }
 
abce546a
 static mfile *mrecv_req_send_create(mdata *data, struct sockaddr_in *addr)
86badd32
 {
   mstat fs;
abce546a
   mfile *m;
91adf830
   uint16_t fnlen;
   uint16_t lnlen;
43a6533f
   uint32_t  ldev;
   uint32_t  hdev;
59da6691
   uint64_t  rdev;
86badd32
 
1b4c64d4
   if((m = mrecv_req_search(data, addr))){
69b027c3
     return(m);
   }
 
d93eefa2
   if(data->head.nstate != MAKUO_SENDSTATE_STAT){
     return(NULL);
   }
 
abce546a
   /* create object */
69d91074
   if(!(m = mfadd(MFRECV))){
facf6c53
     lprintf(0, "[error] %s: out of momory\n", __func__);
abce546a
     return(NULL);
   }
 
1ed29a26
   /* copy header and addr */
abce546a
   memcpy(&(m->addr), addr, sizeof(m->addr));
   memcpy(&(m->mdata.head), &(data->head), sizeof(m->mdata.head));
 
   /* read mstat */
1ed29a26
   data->p = data->data;
43a6533f
   data_safeget(data, &fs, sizeof(fs));
abce546a
 
   /* stat = mstat */
   m->fs.st_mode  = ntohl(fs.mode);
   m->fs.st_uid   = ntohs(fs.uid);
   m->fs.st_gid   = ntohs(fs.gid);
   m->fs.st_size  = ((off_t)ntohl(fs.sizeh) << 32) + (off_t)ntohl(fs.sizel);
   m->fs.st_mtime = ntohl(fs.mtime);
   m->fs.st_ctime = ntohl(fs.ctime);
   fnlen = ntohs(fs.fnlen);
   lnlen = ntohs(fs.lnlen);
 
   /* read filename */
43a6533f
   data_safeget(data, m->fn, fnlen);
abce546a
   m->fn[fnlen] = 0;
 
   /* read linkname */
43a6533f
   data_safeget(data, m->ln, lnlen);
abce546a
   m->ln[lnlen] = 0;
 
57179b09
   /* rdev */
43a6533f
   data_safeget32(data, &hdev);
   data_safeget32(data, &ldev);
   rdev  = hdev;
   rdev <<=  32;
   rdev |= ldev;
59da6691
   m->fs.st_rdev = (dev_t)rdev;
57179b09
 
abce546a
   /* Number of blocks */
   m->seqnomax = m->fs.st_size / MAKUO_BUFFER_SIZE;
   if(m->fs.st_size % MAKUO_BUFFER_SIZE){
     m->seqnomax++; 
   }
 
   return(m);
 }
 
 static void mrecv_req_send(mdata *data, struct sockaddr_in *addr)
 {
69b027c3
   mfile *m;
1b4c64d4
   if((m = mrecv_req_send_create(data, addr))){
d93eefa2
     mtimeget(&(m->lastrecv));
     mrecv_req_send_next(m, data);
   }else{
     if(data->head.nstate != MAKUO_SENDSTATE_DATA){
eeef442d
       msend(mkack(data, addr, MAKUO_RECVSTATE_IGNORE));
86badd32
     }
   }
 }
 
634a23d7
 static void mrecv_req_md5_open(mfile *m, mdata *data, struct sockaddr_in *addr)
86badd32
 {
21e2082d
   int l;
86badd32
   mhash *h;
 
   if(!m){
facf6c53
     m = mfadd(MFRECV);
86badd32
     memcpy(&(m->addr), addr, sizeof(m->addr));
     memcpy(&(m->mdata.head), &(data->head), sizeof(m->mdata.head));
     h = (mhash *)(data->data);
     l = ntohs(h->fnlen);
     memcpy(m->fn, h->filename, l);
     m->fn[l] = 0;
21e2082d
     memcpy(m->mdata.data, h->hash, 16); 
86badd32
     m->fd = open(m->fn, O_RDONLY);
     if(m->fd == -1){
f31a5410
       m->mdata.head.error  = errno;
86badd32
       m->mdata.head.nstate = MAKUO_RECVSTATE_OPENERROR;
     }else{
21e2082d
       m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
       m->link = mkack(&(m->mdata), &(m->addr), MAKUO_RECVSTATE_NONE);
       m->link->link = m;
       MD5_Init(&(m->md5));
86badd32
     }
   }
   mtimeget(&(m->lastrecv));
eeef442d
   msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
86badd32
 }
 
634a23d7
 static void mrecv_req_md5_close(mfile *m, mdata *data, struct sockaddr_in *addr)
86badd32
 {
21e2082d
   if(m){
     if(m->link){
       close(m->fd);
       m->fd = -1;
       MD5_Final(m->mdata.data, &(m->md5));
       m->link->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
       m->link->link = NULL;
       m->link = NULL;
     }
     mrecv_mfdel(m);
   }
eeef442d
   msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE));
86badd32
 }
 
634a23d7
 static void mrecv_req_md5(mdata *data, struct sockaddr_in *addr)
86badd32
 {
1ed29a26
   mfile *m = mrecv_req_search(data, addr);
86badd32
   switch(data->head.nstate){
     case MAKUO_SENDSTATE_OPEN:
634a23d7
       mrecv_req_md5_open(m, data, addr);
86badd32
       break;
     case MAKUO_SENDSTATE_CLOSE:
634a23d7
       mrecv_req_md5_close(m, data, addr);
86badd32
       break;
   }
 }
 
3b736b7c
 static int dsync_write(int fd, char *base, uint8_t sta, uint16_t len, uint32_t mod)
3eaafa57
 {
13671329
   int r;
   size_t s;
   fd_set wfds;
   char buff[PATH_MAX + sizeof(sta) +  sizeof(mod) +  sizeof(len)];
   char *p = buff;
874955ec
 
806dbe3b
   if(!loop_flag){
3b736b7c
     return(1);
806dbe3b
   }
   if(!strcmp(base, ".")){
3b736b7c
     return(0);
874955ec
   }
3b736b7c
   while(len >= 2){
     if(memcmp(base, "./", 2)){
       break;
     }else{
       base += 2;
       len  -= 2;
       if(!len){
         return(0);
       }
     }
806dbe3b
   }
13671329
   while(*base == '/'){
     base++;
     len--;
3b736b7c
     if(!len){
       return(0);
     }
13671329
   }
881df679
 
3b736b7c
   *(uint16_t *)p = len + sizeof(mod);
881df679
   p += sizeof(len);
   *(uint32_t *)p = mod; 
   p += sizeof(mod);
13671329
   memcpy(p, base, len);
 
   p = buff;
686a33f7
   s = sizeof(len) + sizeof(mod) + len;
13671329
   while(s){
     FD_ZERO(&wfds);
     FD_SET(fd,&wfds);
     if(select(1024, NULL, &wfds, NULL, NULL) < 0){
       if(!loop_flag){
3b736b7c
         return(1);
13671329
       }
       continue;
     }
     if(FD_ISSET(fd,&wfds)){
       r = write(fd, p, s);
       if(r == -1){
881df679
         if(errno == EINTR){
           continue;
         }
3b736b7c
         lprintf(0, "[error] %s: write error %s\n", __func__, base);
         return(-1);
13671329
       }else{
         s -= r;
         p += r;
       }
     }
   }
3b736b7c
   return(0);
874955ec
 }
 
3b736b7c
 static int dsync_scan(int fd, char *base, int recurs, excludeitem *e)
874955ec
 {
3b736b7c
   int r;
874955ec
   DIR *d;
13671329
   uint16_t len;
874955ec
   struct stat st;
   struct dirent *dent;
   char path[PATH_MAX];
806dbe3b
 
   if(!loop_flag){
3b736b7c
     return(1);
806dbe3b
   }
cf58cd6d
   /*----- read only -----*/
   if(moption.dontrecv){
     return(0);
   }
8e09e847
   /*----- exclude -----*/
   if(isexclude(base, e, 0)){
3b736b7c
     return(0);
8e09e847
   }
13671329
   len = strlen(base);
   if(lstat(base, &st) == -1){
3b736b7c
     return(dsync_write(fd, base, MAKUO_SENDSTATE_ERROR, len, st.st_mode));
874955ec
   }
   if(S_ISLNK(st.st_mode)){
3b736b7c
     return(dsync_write(fd, base, MAKUO_SENDSTATE_STAT, len, st.st_mode));
874955ec
   }
   if(!S_ISDIR(st.st_mode)){
3b736b7c
     return(dsync_write(fd, base, MAKUO_SENDSTATE_STAT, len, st.st_mode));
874955ec
   }
8e09e847
   /*----- exclude dir -----*/
   if(isexclude(base, e, 1)){
3b736b7c
     return(0);
8e09e847
   }
3b736b7c
 
   /*----- dir scan -----*/
1b4c64d4
   if((d = opendir(base))){
     while((dent=readdir(d))){
3b736b7c
       if(!loop_flag){
806dbe3b
         break;
3b736b7c
       }
       if(!strcmp(dent->d_name, ".")){
874955ec
         continue;
3b736b7c
       }
       if(!strcmp(dent->d_name, "..")){
874955ec
         continue;
3b736b7c
       }
874955ec
       sprintf(path, "%s/%s", base, dent->d_name);
806dbe3b
       if(recurs){
1b4c64d4
         if((r = dsync_scan(fd, path, recurs, e))){
3b736b7c
           closedir(d);
           return(r);
         }
806dbe3b
       }else{
13671329
         len = strlen(path);
1b4c64d4
         if((r = dsync_write(fd, path, MAKUO_SENDSTATE_STAT, len, st.st_mode))){
3b736b7c
           closedir(d);
           return(r);
         }
806dbe3b
       }
3eaafa57
     }
874955ec
     closedir(d);
13671329
     len = strlen(base);
3b736b7c
     return(dsync_write(fd, base, MAKUO_SENDSTATE_STAT, len, st.st_mode));
3eaafa57
   }
3b736b7c
   return(dsync_write(fd, base, MAKUO_SENDSTATE_ERROR, len, st.st_mode));
3eaafa57
 }
 
 static void mrecv_req_dsync_open(mfile *m, mdata *data, struct sockaddr_in *addr)
 {
eeef442d
   msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN));
806dbe3b
   if(m){
     return;
3eaafa57
   }
eeef442d
   m = mfadd(MFRECV);
806dbe3b
   m->mdata.head.opcode = data->head.opcode;
   m->mdata.head.reqid  = data->head.reqid;
aa2b6bc8
   m->mdata.head.flags  = data->head.flags;
806dbe3b
   m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
   memcpy(&(m->addr), addr, sizeof(m->addr));
   if(data->head.szdata){
     memcpy(m->fn, data->data, data->head.szdata);
   }
   m->fn[data->head.szdata] = 0;
d38d0833
   mtimeget(&(m->lastrecv));
8e09e847
 }
 
 static void mrecv_req_dsync_data(mfile *m, mdata *data, struct sockaddr_in *addr)
 {
   int  pid;
   int p[2];
   mfile *d;
   char path[PATH_MAX];
   uint16_t len;
 
eeef442d
   msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN));
d38d0833
   if(!m){
     return;
   }
 
   mtimeget(&(m->lastrecv));
8e09e847
   if(m->mdata.head.seqno >= data->head.seqno){
     return;
   }
 
   if(data->head.szdata){
     data->p = data->data;
     while(!data_safeget16(data, &len)){
       data_safeget(data, path, len);
       path[len] = 0;
       m->exclude = exclude_add(m->exclude, path);
       m->mdata.head.seqno++;
     }
     return;
   }
806dbe3b
 
facf6c53
   d = mfins(MFSEND);
442bde46
   d->link = m;
   m->link = d;
686a33f7
   d->initstate = 1;
   d->sendwait  = 0;
8e09e847
   d->exclude = m->exclude;
   m->exclude = NULL;
806dbe3b
   d->mdata.head.opcode = MAKUO_OP_DEL;
   d->mdata.head.reqid  = getrid();
   d->mdata.head.flags  = data->head.flags;
   d->mdata.head.seqno  = data->head.reqid;
   d->mdata.head.nstate = MAKUO_SENDSTATE_STAT;
   memcpy(&(d->addr), addr, sizeof(d->addr));
8e09e847
   strcpy(d->fn, m->fn);
aa2b6bc8
   d->sendto = 1;
806dbe3b
   if(d->mdata.head.flags & MAKUO_FLAG_RECURS){
     d->recurs = 1;
   }
   if(d->mdata.head.flags & MAKUO_FLAG_DRYRUN){
     d->dryrun = 1;
   }
 
   /* fork */
   pipe(p);
   pid = fork();
   if(pid == -1){
7680186f
     lprintf(0, "%s: %s fork error\n", __func__, strerror(errno));
806dbe3b
     close(p[0]);
     close(p[1]);
     return;
   }
   if(pid){
     /* parent */
     d->pid  = pid;
     d->pipe = p[0];
     close(p[1]); /* write close */
1b4c64d4
     while((d->exclude = exclude_del(d->exclude)));
806dbe3b
   }else{
     /* child */
     close(p[0]); /* read close */
8e09e847
     dsync_scan(p[1], d->fn, d->recurs, d->exclude);
806dbe3b
     close(p[1]);
     _exit(0);
   }  
3eaafa57
 }
 
69d91074
 static void mrecv_req_dsync_close(mfile *m, mdata *data, struct sockaddr_in *addr)
 {
   if(!m){
eeef442d
     msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE));
69d91074
   }else{
d38d0833
     mtimeget(&(m->lastrecv));
     if(m->link){
       msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN));
     }else{
       msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE));
       mrecv_mfdel(m); 
     }
69d91074
   }
 }
 
806dbe3b
 static void mrecv_req_dsync_break(mfile *m, mdata *data, struct sockaddr_in *addr)
 {
d38d0833
   msend(mkack(data, addr, MAKUO_RECVSTATE_BREAK));
806dbe3b
   if(m){
     if(m->link){
       m->link->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
       m->link->sendwait = 0;
     }
     mrecv_mfdel(m);
   }
 }
 
3eaafa57
 /*
  *  dsync
  */
 static void mrecv_req_dsync(mdata *data, struct sockaddr_in *addr)
 {
874955ec
   mfile *m = mrecv_req_search(data, addr);
3eaafa57
   switch(data->head.nstate){
     case MAKUO_SENDSTATE_OPEN:
       mrecv_req_dsync_open(m, data, addr);
       break;
8e09e847
     case MAKUO_SENDSTATE_DATA:
       mrecv_req_dsync_data(m, data, addr);
       break;
69d91074
     case MAKUO_SENDSTATE_CLOSE:
       mrecv_req_dsync_close(m, data, addr);
       break;
806dbe3b
     case MAKUO_SENDSTATE_BREAK:
       mrecv_req_dsync_break(m, data, addr);
       break;
874955ec
   }
 }
 
 /*
  *  del
  */
13671329
 static void mrecv_req_del_open(mdata *data, struct sockaddr_in *addr)
874955ec
 {
   uint16_t len;
13671329
   uint32_t mod;
686a33f7
   mfile *a = mkack(data, addr, MAKUO_RECVSTATE_OPEN);
4dd3b6a8
   mfile *m = mrecv_req_search(data, addr);
   mhost *t = member_get(&(addr->sin_addr));
442bde46
   char path[PATH_MAX];
1a2a228f
 
13671329
   if(!a){
881df679
     lprintf(0, "[error] %s: ack can't create\n", __func__);
13671329
     return;
   }
442bde46
   data->p = data->data;
8e09e847
   while(!data_safeget16(data, &len)){
     data_safeget32(data, &mod);
     len -= sizeof(uint32_t);
     data_safeget(data, path, len);
     path[len] =  0;
8f5b78cc
 #ifdef MAKUO_DEBUG
     lprintf(9, "%s: %s", __func__, path);
 #endif
8e09e847
     if(lstat(path, &(a->fs)) == -1 && errno == ENOENT){
8f5b78cc
 #ifdef MAKUO_DEBUG
       lprintf(9, " [DELETE]");
 #endif
8e09e847
       data_safeset16(&(a->mdata), len + sizeof(uint32_t));
       data_safeset32(&(a->mdata), 0);
       data_safeset(&(a->mdata), path, len);
13671329
     }
8f5b78cc
 #ifdef MAKUO_DEBUG
     lprintf(9, "\n");
 #endif
13671329
   }
eeef442d
   msend(a);
4dd3b6a8
 
   if(m){
     return;
   }
   m = mfadd(MFRECV);
   m->mdata.head.opcode = data->head.opcode;
   m->mdata.head.reqid  = data->head.reqid;
   m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
   memcpy(&(m->addr), addr, sizeof(m->addr));
   mtimeget(&(m->lastrecv));
   if(data->head.flags & MAKUO_FLAG_DRYRUN){
     m->dryrun = 1;
   }
8e09e847
 }
 
 static void mrecv_req_del_data_report(mfile *m, mcomm *c, uint32_t err, char *hn, char *path)
 {
   char *dryrun = "";
 
   if(m->dryrun){
     dryrun = "(dryrun) ";
   }
   if(err){
05348f1d
     cprintf(0, c,  "delete error (%s) %s:%s\n", strerror(err), hn, path);
     lprintf(1, "delete error (%s) %s:%s\n", strerror(err), hn, path);
8e09e847
   }else{
     cprintf(0, c,  "%sdelete %s:%s\n", dryrun, hn, path);
05348f1d
     lprintf(1, "%sdelete %s:%s\n", dryrun, hn, path);
806dbe3b
   }
 }
 
 static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr)
 {
   uint32_t err;
   uint16_t len;
   char *hn = "unknown host";
aa2b6bc8
   mhost *t = member_get(&(addr->sin_addr));
806dbe3b
   mcomm *c = NULL;
eeef442d
   mfile *a = NULL;
806dbe3b
   mfile *m = mrecv_req_search(data, addr);
686a33f7
   char path[PATH_MAX];
806dbe3b
 
eeef442d
   msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN));
4dd3b6a8
   if(!m){
     return;
   }
   if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){
806dbe3b
     return;
   }
   if(t){
     hn = t->hostname;
   }
eeef442d
   for(a=mftop[MFSEND];a;a=a->next){
806dbe3b
     if((a->mdata.head.reqid == data->head.seqno) && (a->comm != NULL)){
       c = a->comm;
       break;
     } 
   }
686a33f7
   data->p = data->data;
442bde46
   while(!data_safeget16(data, &len)){
     data_safeget32(data, &err);
686a33f7
     len -= sizeof(uint32_t);
442bde46
     data_safeget(data, path, len);
686a33f7
     path[len] =  0;
8e09e847
     mrecv_req_del_data_report(m, c, err, hn, path);
806dbe3b
   }
4dd3b6a8
   m->mdata.head.ostate = m->mdata.head.nstate;
   m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
806dbe3b
 }
 
 static void mrecv_req_del_close(mdata *data, struct sockaddr_in *addr)
 {
   mfile *m = mrecv_req_search(data, addr);
eeef442d
   msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE));
806dbe3b
   mrecv_mfdel(m);
 }
 
 static void mrecv_req_del(mdata *data, struct sockaddr_in *addr)
 {
   switch(data->head.nstate){
825350fc
     case MAKUO_SENDSTATE_OPEN: 
13671329
       mrecv_req_del_open(data, addr);
806dbe3b
       break;
825350fc
     case MAKUO_SENDSTATE_DATA: 
806dbe3b
       mrecv_req_del_data(data, addr);
       break;
825350fc
     case MAKUO_SENDSTATE_CLOSE: 
806dbe3b
       mrecv_req_del_close(data, addr);
       break;
3eaafa57
   }
 }
 
634a23d7
 static void mrecv_req(mdata *data, struct sockaddr_in *addr)
86badd32
 {
634a23d7
   switch(data->head.opcode){
86badd32
     case MAKUO_OP_PING:
13671329
       mrecv_req_ping(data,  addr);
86badd32
       break;
     case MAKUO_OP_EXIT:
13671329
       mrecv_req_exit(data,  addr);
86badd32
       break;
8f9aeac1
     case MAKUO_OP_SEND:
13671329
       mrecv_req_send(data,  addr);
86badd32
       break;
     case MAKUO_OP_MD5:
13671329
       mrecv_req_md5(data,   addr);
86badd32
       break;
3eaafa57
     case MAKUO_OP_DSYNC:
       mrecv_req_dsync(data, addr);
       break;
874955ec
     case MAKUO_OP_DEL:
13671329
       mrecv_req_del(data,   addr);
874955ec
       break;
     default:
eeef442d
       msend(mkack(data, addr, MAKUO_RECVSTATE_IGNORE));
874955ec
       break;
86badd32
   }
 }
806dbe3b
 
27b2005f
 /******************************************************************
 *
 * Receive common functions (public)
 *
 *******************************************************************/
 void mrecv_gc()
 {
   mhost *t = members;
eeef442d
   mfile *m = mftop[MFRECV]; 
27b2005f
 
   /* file timeout */
   while(m){
     if(mtimeout(&(m->lastrecv), MAKUO_RECV_GCWAIT)){
eeef442d
       lprintf(0,"%s: mfile object GC state=%s %s\n", __func__, strrstate(m->mdata.head.nstate), m->fn);
27b2005f
       m = mrecv_mfdel(m);
       continue;
     }
     m = m->next;
   }
 
   /* pong timeout */
   while(t){
     if(!mtimeout(&(t->lastrecv), MAKUO_PONG_TIMEOUT)){
       t = t->next;
     }else{
       lprintf(0,"%s: pong timeout %s\n", __func__, t->hostname);
eeef442d
       member_del_message(1, t, "pong time out");
27b2005f
       if(t->next){
         t = t->next;
         member_del(t->prev);
       }else{
         member_del(t);
         t = NULL;
      } 
     }      
   }
 }
 
 void mrecv_clean()
 {
eeef442d
   mfile *m = mftop[MFRECV];
1b4c64d4
   while((m = mrecv_mfdel(m)));
27b2005f
 }
 
d6e8005b
 int mrecv()
43a6533f
 {
d6e8005b
   mfile *m;
668cce42
   mhost *t;
d6e8005b
   mdata data;
43a6533f
   struct sockaddr_in addr;
d6e8005b
 
eeef442d
   m = mftop[MFSEND];
d6e8005b
   if(mrecv_packet(moption.mcsocket, &data, &addr) == -1){
43a6533f
     return(0);
   }
1b4c64d4
   if((t = member_get(&addr.sin_addr))){
668cce42
     mtimeget(&(t->lastrecv));
   }
43a6533f
   if(data.head.flags & MAKUO_FLAG_ACK){
     mrecv_ack(&data, &addr);
   }else{
     mrecv_req(&data, &addr);
   }
eeef442d
   return(m == mftop[MFSEND]);
43a6533f
 }