msend.c
c858e9b6
 /*
  * msend.c
83cbbef8
  * Copyright (C) 2008-2012 KLab Inc. 
c858e9b6
  */
86badd32
 #include "makuosan.h"
 
8f9aeac1
 /******************************************************************
 *
 * send common functions (private)
 *
 *******************************************************************/
86badd32
 static mfile *msend_mfdel(mfile *m)
 {
   mfile *r;
806dbe3b
   if(!m){
86badd32
     return(NULL);
806dbe3b
   }
86badd32
   r = m->next;
806dbe3b
   if(m->fd != -1){
     close(m->fd);
   }
   if(m->pipe != -1){
     close(m->pipe);
   }
   if(m->pid){
     kill(m->pid, SIGTERM);
     waitpid(m->pid, NULL, 0);
   }
3b736b7c
   if(m->link){
     m->link->link = NULL;
     m->link = NULL;
   }
1b4c64d4
   while((m->mark = delmark(m->mark)));
abce546a
   clr_hoststate(m);
86badd32
   mfdel(m);
   return(r);
 }
 
 static int msend_encrypt(mdata *data)
 {
   int  szdata;
   MD5_CTX ctx;
 
   szdata = data->head.szdata;
7284b461
   if(moption.cryptena){
86badd32
     data->head.flags |= MAKUO_FLAG_CRYPT;
7284b461
     if(data->head.szdata){
       MD5_Init(&ctx);
       MD5_Update(&ctx, data->data, data->head.szdata);
       MD5_Final(data->head.hash, &ctx);
       for(szdata=0;szdata<data->head.szdata;szdata+=8){
         BF_encrypt((BF_LONG *)(data->data + szdata), &EncKey);
       }
     }
86badd32
   }
   return(szdata);
 }
 
d6e8005b
 static int msend_readywait()
 {
   fd_set fds;
46e655e5
   time_t tm;
d6e8005b
   struct timeval tv;
46e655e5
 
e27f80f1
   if(moption.sendrate){
46e655e5
     tm = time(NULL);
e27f80f1
     while(tm == send_time){
       if(moption.sendrate > send_rate){
         break;
       }
       usleep(1000);
       tm = time(NULL);
     }
     if(tm != send_time){
       view_rate = send_rate;
       send_rate =  0;
       send_time = tm;
     }
46e655e5
   }
d6e8005b
   while(!moption.sendready){
     FD_ZERO(&fds);
     FD_SET(moption.mcsocket, &fds);
     tv.tv_sec  = 1;
     tv.tv_usec = 0;
     if(select(1024, NULL, &fds, NULL, &tv) == 1){
       moption.sendready = FD_ISSET(moption.mcsocket, &fds);
     }else{
       if(loop_flag){
         continue;
       }else{
         break;
       }
     }
   }
   return(moption.sendready);
 }
 
86badd32
 static int msend_packet(int s, mdata *data, struct sockaddr_in *addr)
 {
   int r;
   int szdata;
   mdata senddata;
 
   memcpy(&senddata, data, sizeof(senddata));
   szdata = msend_encrypt(&senddata);
 
   senddata.head.szdata = htons(senddata.head.szdata);
   senddata.head.flags  = htons(senddata.head.flags);
   senddata.head.reqid  = htonl(senddata.head.reqid);
   senddata.head.seqno  = htonl(senddata.head.seqno);
36959136
   senddata.head.maddr  = senddata.head.maddr;
   senddata.head.mport  = senddata.head.mport;
f31a5410
   senddata.head.error  = htonl(senddata.head.error);
825350fc
   szdata += sizeof(mhead);
86badd32
  
d6e8005b
   while(msend_readywait()){ 
     moption.sendready = 0;
825350fc
     r = sendto(s, &senddata, szdata, 0, (struct sockaddr*)addr, sizeof(struct sockaddr_in));
     if(r == szdata){
46e655e5
       send_rate += r;
eeef442d
       return(0); /* success */
825350fc
     }
     if(r == -1){
       if(errno == EINTR){
         continue;
86badd32
       }
facf6c53
       lprintf(0,"[error] %s: send error (%s) %s %s rid=%d size=%d seqno=%d\n",
1ed29a26
         __func__,
         strerror(errno), 
         stropcode(data), 
         strmstate(data),
         data->head.reqid, 
         szdata, 
         data->head.seqno);
       break;
     }else{
facf6c53
       lprintf(0, "[error] %s: send size error %s %s rid=%d datasize=%d sendsize=%d seqno=%d\n",
1ed29a26
         __func__,
         stropcode(data), 
         strmstate(data),
         data->head.reqid,
         szdata, 
         r, 
         data->head.seqno);
       break;
86badd32
     }
   }
825350fc
   return(-1);
86badd32
 }
 
8f9aeac1
 /* retry */
825350fc
 static int msend_retry(mfile *m)
8f9aeac1
 {
abce546a
   uint8_t *r;
   mhost   *t;
 
825350fc
   if(!m){
     return(-1);
   }
8f9aeac1
   if(!m->sendwait){
     m->retrycnt = MAKUO_SEND_RETRYCNT;
825350fc
     return(0);
8f9aeac1
   }
eeef442d
   if(moption.loglevel > 1){
     mprintf(2, __func__, m); 
     for(t=members;t;t=t->next){
       r = get_hoststate(t, m);
       if(!r){
818cb595
         lprintf(0, "[error] %s: can't alloc state area %s\n", __func__, t->hostname);
eeef442d
         continue;
       }
f2c28a81
       if(m->sendto){
         if(!memcmp(&(m->addr.sin_addr), &(t->ad), sizeof(t->ad))){
eeef442d
           if(*r == MAKUO_RECVSTATE_NONE){
f2c28a81
             lprintf(2, "%s:   %s %s(%s)\n", __func__, strrstate(*r), inet_ntoa(t->ad), t->hostname);
eeef442d
           }
f2c28a81
         }
       }else{
         switch(moption.loglevel){
           case 2:
             if(*r == MAKUO_RECVSTATE_NONE){
               lprintf(2, "%s:   %s %s(%s)\n", __func__, strrstate(*r), inet_ntoa(t->ad), t->hostname);
             }
             break;
           default:
             lprintf(3, "%s:   %s %s(%s)\n", __func__, strrstate(*r), inet_ntoa(t->ad), t->hostname);
             break;
         }
eeef442d
       }
8f9aeac1
     }
   }
   m->retrycnt--;
825350fc
   return(0);
8f9aeac1
 }
 
 /* send & free */
86badd32
 static void msend_shot(int s, mfile *m)
 {
1ed29a26
   if(!msend_packet(s, &(m->mdata), &(m->addr))){
43a6533f
     msend_mfdel(m);
   }
86badd32
 }
 
8f9aeac1
 /******************************************************************
 *
 * ack send functions (for destination node tasks)
 *
 *******************************************************************/
 static void msend_ack_ping(int s, mfile *m)
 {
   msend_shot(s, m);
 }
 
 static void msend_ack_send(int s, mfile *m)
86badd32
 {
8f9aeac1
   msend_shot(s, m);
 }
 
 static void msend_ack_md5(int s, mfile *m)
 {
21e2082d
   int r;
1b4c64d4
   unsigned char hash[16];
   unsigned char buff[8192];
21e2082d
   mfile *d = m->link;
   if(!d){
     msend_shot(s, m);
     return;
   }
   r = read(d->fd, buff, sizeof(buff));
   if(r > 0){
     MD5_Update(&(d->md5), buff, r);
     return;
   }
   if(r == -1){
     if(errno == EINTR){
       return;
     }
     d->mdata.head.error  = errno;
     d->mdata.head.nstate = MAKUO_RECVSTATE_READERROR;
     lprintf(0, "[error] %s: file read error %s\n", __func__, d->fn);
     MD5_Final(hash, &(d->md5));
   }else{
     MD5_Final(hash, &(d->md5));
     if(!memcmp(hash, d->mdata.data, 16)){
       d->mdata.head.nstate = MAKUO_RECVSTATE_MD5OK;
     }else{
       d->mdata.head.nstate = MAKUO_RECVSTATE_MD5NG;
     }
   }
   m->mdata.head.error  = d->mdata.head.error;
   m->mdata.head.nstate = d->mdata.head.nstate;
   close(d->fd);
   d->fd = -1;
   d->link = NULL;
   m->link = NULL;
8f9aeac1
   msend_shot(s, m);
 }
 
3eaafa57
 static void msend_ack_dsync(int s, mfile *m)
 {
   msend_shot(s, m);
 }
 
874955ec
 static void msend_ack_del(int s, mfile *m)
 {
   msend_shot(s, m);
 }
 
8f9aeac1
 static void msend_ack(int s, mfile *m)
 {
   switch(m->mdata.head.opcode){
     case MAKUO_OP_PING:
       msend_ack_ping(s, m);
       break;
     case MAKUO_OP_EXIT:
       break;
     case MAKUO_OP_SEND:
       msend_ack_send(s, m);
       break;
     case MAKUO_OP_MD5:
       msend_ack_md5(s, m);
       break;
3eaafa57
     case MAKUO_OP_DSYNC:
       msend_ack_dsync(s, m);
       break;
874955ec
     case MAKUO_OP_DEL:
       msend_ack_del(s, m);
       break;
8f9aeac1
     /* 機能追加はここ */
   }
86badd32
 }
 
8f9aeac1
 /******************************************************************
 *
 * req send functions (for source node tasks)
 *
 *******************************************************************/
dfd4c771
 static void msend_req_send_break_init(int s, mfile *m)
 {
   m->sendwait  = 1;
   m->initstate = 0;
   ack_clear(m, -1);
   msend_packet(s, &(m->mdata), &(m->addr));
 }
 
8f9aeac1
 static void msend_req_send_break(int s, mfile *m)
86badd32
 {
dfd4c771
   if(m->initstate){
     msend_req_send_break_init(s, m);
     return;
   }
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
86badd32
   msend_mfdel(m);
 }
 
8f9aeac1
 static void msend_req_send_stat_init(int s, mfile *m)
86badd32
 {
59da6691
   mstat    fs;
43a6533f
   uint64_t dev;
17b77ef8
 
86badd32
   if(!m->comm){
     msend_mfdel(m);
314da278
     m = NULL;
91adf830
     return;
86badd32
   }
17b77ef8
 
91adf830
   m->mdata.p = m->mdata.data;
   m->mdata.head.szdata  = sizeof(fs);
   m->mdata.head.szdata += strlen(m->fn);
   m->mdata.head.szdata += strlen(m->ln);
57179b09
   m->mdata.head.szdata += sizeof(uint64_t);
91adf830
   if(m->mdata.head.szdata > MAKUO_BUFFER_SIZE){
d6e8005b
     lprintf(0, "[error] %s: buffer size over size=%d file=%s\n",   __func__, m->mdata.head.szdata, m->fn);
91adf830
     cprintf(0, m->comm, "error: buffer size over size=%d file=%s\n", m->mdata.head.szdata, m->fn);
     return;
   }
   fs.mode  = htonl(m->fs.st_mode);
   fs.uid   = htons(m->fs.st_uid);
   fs.gid   = htons(m->fs.st_gid);
   fs.sizel = htonl((uint32_t)(m->fs.st_size & 0xFFFFFFFF));
   fs.sizeh = htonl((uint32_t)(m->fs.st_size >> 32));
   fs.mtime = htonl(m->fs.st_mtime);
   fs.ctime = htonl(m->fs.st_ctime);
   fs.fnlen = htons(strlen(m->fn));
   fs.lnlen = htons(strlen(m->ln));
43a6533f
   dev = (uint64_t)(m->fs.st_rdev);
59da6691
 
43a6533f
   m->mdata.head.szdata = 0;
   data_safeset(&(m->mdata), &fs, sizeof(fs));
   data_safeset(&(m->mdata), m->fn, strlen(m->fn));
   data_safeset(&(m->mdata), m->ln, strlen(m->ln));
   data_safeset32(&(m->mdata), (uint32_t)(dev >> 32));
   data_safeset32(&(m->mdata), (uint32_t)(dev & 0xFFFFFFFF));
91adf830
   m->sendwait  = 1;
   m->initstate = 0;
   ack_clear(m, -1);
   msend_packet(s, &(m->mdata), &(m->addr));
86badd32
 }
 
8e09e847
 static void msend_req_send_stat_delete_report(mfile *m)
86badd32
 {
abce546a
   mhost   *t;
8e09e847
   uint8_t *r;
   char *dryrun = "";
86badd32
 
8e09e847
   if(m->dryrun){
     dryrun = "(dryrun) ";
     if(ack_check(m, MAKUO_RECVSTATE_DELETEOK) == 1){
       if(m->comm){
         if(m->comm->loglevel == 0){
e74d434d
           cprintf(0, m->comm, "%s[delete:%s]\n", dryrun, m->fn);
7ec57664
         }
       }
8e09e847
     }
   }
 
e74d434d
   for(t=members;t;t=t->next){
     if(m->sendto){
       if(t != member_get(&(m->addr.sin_addr))){
         continue;
7ec57664
       }
     }
1b4c64d4
     if((r = get_hoststate(t, m))){
8e09e847
       if(*r == MAKUO_RECVSTATE_DELETEOK){
e74d434d
         cprintf(1, m->comm, "%sdelete %s:%s\n", dryrun, t->hostname, m->fn);
f31a5410
         lprintf(1, "%sdelete %s:%s\n", dryrun, t->hostname, m->fn);
8e09e847
       }
     }
7ec57664
   }
8e09e847
 }
 
 static void msend_req_send_stat_update_report(mfile *m)
 {
   uint8_t *r;
   mhost   *t;
   char *dryrun = "";
7ec57664
 
e396e004
   if(m->dryrun){
8e09e847
     dryrun = "(dryrun) ";
     if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
b9f378cd
       if(m->comm){
         if(m->comm->loglevel == 0){
e74d434d
           cprintf(0, m->comm, "%s[update:%s]\n", dryrun, m->fn);
86badd32
         }
       }
e396e004
     }
8e09e847
   }
 
e74d434d
   for(t=members;t;t=t->next){
     if(m->sendto){
       if(t != member_get(&(m->addr.sin_addr))){
         continue;
       }
     }
1b4c64d4
     if((r = get_hoststate(t, m))){
e74d434d
       if(*r == MAKUO_RECVSTATE_UPDATE){
8e09e847
         cprintf(1, m->comm, "%supdate %s:%s\r\n", dryrun, t->hostname, m->fn);
f31a5410
         lprintf(1, "%supdate %s:%s\n", dryrun, t->hostname, m->fn);
e74d434d
       }
       if(*r == MAKUO_RECVSTATE_SKIP){
8e09e847
         cprintf(2, m->comm, "%sskip   %s:%s\r\n", dryrun, t->hostname, m->fn);
3bd4a8ab
         lprintf(5, "%sskip   %s:%s\n", dryrun, t->hostname, m->fn);
e74d434d
       }
       if(*r == MAKUO_RECVSTATE_READONLY){
3bd4a8ab
         cprintf(3, m->comm, "%sskipro %s:%s\r\n", dryrun, t->hostname, m->fn);
         lprintf(6, "%sskipro %s:%s\n", dryrun, t->hostname, m->fn);
8e09e847
       }
     }
   }
 }
 
 static void msend_req_send_stat(int s, mfile *m)
 {
   if(m->initstate){
     msend_req_send_stat_init(s, m);
     return;
   }
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
   if(m->mdata.head.flags & MAKUO_FLAG_SYNC){
     msend_req_send_stat_delete_report(m);
314da278
     m->initstate = 1;
7284b461
     m->mdata.head.ostate = m->mdata.head.nstate;
8e09e847
     m->mdata.head.nstate = MAKUO_SENDSTATE_LAST;
e396e004
   }else{
8e09e847
     msend_req_send_stat_update_report(m);
     m->initstate = 1;
     m->mdata.head.ostate = m->mdata.head.nstate;
     if(m->dryrun){
b9f378cd
       m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
     }else{
5cf3fe0f
       if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
         m->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
       }else{
         m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
       }
86badd32
     }
   }
 }
 
8f9aeac1
 static void msend_req_send_open_init(int s, mfile *m)
86badd32
 {
fc430629
   int e;
314da278
   m->sendwait  = 1;
   m->initstate = 0;
86badd32
   ack_clear(m, MAKUO_RECVSTATE_UPDATE);
5fe10f4f
 
86badd32
   /*----- symlink -----*/
57179b09
   if(S_ISLNK(m->fs.st_mode) || !S_ISREG(m->fs.st_mode)){
86badd32
     msend_packet(s, &(m->mdata), &(m->addr));
   }else{
57179b09
     m->fd = open(m->fn, O_RDONLY, 0);
     if(m->fd != -1){
86badd32
       msend_packet(s, &(m->mdata), &(m->addr));
57179b09
     }else{
fc430629
       e = errno;
57179b09
       m->sendwait  = 0;
       m->initstate = 1;
       m->mdata.head.ostate = m->mdata.head.nstate;
       m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
fc430629
       cprintf(0, m->comm, "error: %s %s\n", strerror(e), m->fn);
d6e8005b
       lprintf(0, "[error] %s: %s %s\n", __func__,   strerror(e), m->fn);
86badd32
     }
   }
 }
 
8f9aeac1
 static void msend_req_send_open(int s, mfile *m)
86badd32
 {
314da278
   if(m->initstate){
8f9aeac1
     msend_req_send_open_init(s, m);
314da278
     return;
   }
86badd32
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
314da278
     return;
   }
5fe10f4f
   if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
     m->sendwait = 1;
5f0d41cd
     ack_clear(m, MAKUO_RECVSTATE_UPDATE);
5fe10f4f
     return;
   }
57179b09
   if(S_ISLNK(m->fs.st_mode) || !S_ISREG(m->fs.st_mode)){
5fe10f4f
     m->initstate = 1;
57179b09
     m->mdata.head.ostate = m->mdata.head.nstate;
5fe10f4f
     m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
86badd32
   }else{
57179b09
     m->mdata.head.seqno  = 0;
     m->mdata.head.ostate = m->mdata.head.nstate;
     m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
86badd32
   }
 }
 
abce546a
 static void msend_req_send_markdata(int s, mfile *m)
 {
ba2caa89
   int   r;
   off_t offset;
f31a5410
   if(!m->mark){
abce546a
     /* close */
     m->initstate = 1;
7284b461
     m->mdata.head.ostate = m->mdata.head.nstate;
abce546a
     m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
     return;
   }
f31a5410
   m->mdata.head.seqno = seq_getmark(m);
ba2caa89
   offset = m->mdata.head.seqno;
   offset *= MAKUO_BUFFER_SIZE;
   lseek(m->fd, offset, SEEK_SET);
481bf811
   r = read(m->fd, m->mdata.data, MAKUO_BUFFER_SIZE);
   if(r>0){
     m->mdata.head.szdata = r;
     msend_packet(s, &(m->mdata), &(m->addr));
3cc405fb
     if(!m->mark){
       m->initstate = 1;
       m->mdata.head.nstate = MAKUO_SENDSTATE_MARK;
abce546a
     }
3cc405fb
     return;
abce546a
   }
3cc405fb
   if(!r){
     lprintf(0, "[error] %s: read eof? seqno=%d %s\n", __func__, m->mdata.head.seqno, m->fn);
     cprintf(0, m->comm, "error: read eof? seqno=%d %s\n", m->mdata.head.seqno, m->fn);
   }else{
     lprintf(0, "[error] %s: can't read (%s) seqno=%d %s\n",   __func__, strerror(errno), m->mdata.head.seqno, m->fn);
     cprintf(0, m->comm, "error: can't read (%s) seqno=%d %s\n", strerror(errno), m->mdata.head.seqno, m->fn);
481bf811
   }
3cc405fb
   m->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
   m->initstate = 1;
abce546a
 }
 
 static void msend_req_send_filedata(int s, mfile *m)
 {
ba2caa89
   off_t offset;
abce546a
   int readsize;
f31a5410
   if(m->mark){
     m->mdata.head.seqno = seq_getmark(m);
481bf811
   }else{
     m->mdata.head.seqno = m->seqnonow++;
   }
ba2caa89
   offset  = m->mdata.head.seqno;
   offset *= MAKUO_BUFFER_SIZE;
   lseek(m->fd, offset, SEEK_SET);
abce546a
   readsize = read(m->fd, m->mdata.data, MAKUO_BUFFER_SIZE);
   if(readsize > 0){
     m->mdata.head.szdata = readsize;
481bf811
     msend_packet(s, &(m->mdata), &(m->addr));
abce546a
   }else{
     if(readsize == -1){
       /* err */
d6e8005b
       lprintf(0, "[error] %s: can't read (%s) seqno=%d %s\n",   __func__, strerror(errno), m->mdata.head.seqno, m->fn);
f31a5410
       cprintf(0, m->comm, "error: can't read (%s) seqno=%d %s\n", strerror(errno), m->mdata.head.seqno, m->fn);
3cc405fb
       m->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
       m->initstate = 1;
abce546a
     }else{
       /* eof */
05348f1d
       lprintf(9, "%s: block send count=%d %s\n", __func__, m->mdata.head.seqno, m->fn);
abce546a
       m->mdata.head.seqno  = 0;
       m->mdata.head.nstate = MAKUO_SENDSTATE_MARK;
       m->initstate = 1;
       m->lickflag  = 1;
     }
   }
 }
 
91adf830
 static void msend_req_send_data(int s, mfile *m)
 {
   if(m->lickflag){
abce546a
     msend_req_send_markdata(s, m); /* send retry */
91adf830
   }else{
abce546a
     msend_req_send_filedata(s, m); /* send data  */
91adf830
   }
 }
 
 static void msend_req_send_mark_init(int s, mfile *m)
86badd32
 {
314da278
   m->sendwait  = 1;
   m->initstate = 0;
5fe10f4f
   ack_clear(m, MAKUO_RECVSTATE_UPDATE);
86badd32
   ack_clear(m, MAKUO_RECVSTATE_OPEN);
481bf811
   ack_clear(m, MAKUO_RECVSTATE_MARK);
86badd32
   msend_packet(s, &(m->mdata), &(m->addr));
 }
 
91adf830
 static void msend_req_send_mark(int s, mfile *m)
86badd32
 {
314da278
   if(m->initstate){
91adf830
     msend_req_send_mark_init(s, m);
314da278
     return;
   }
86badd32
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
314da278
     return;
86badd32
   }
5fe10f4f
   if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
     msend_req_send_mark_init(s, m);
     return;
   }
   if(ack_check(m, MAKUO_RECVSTATE_OPEN) == 1){
     msend_req_send_mark_init(s, m);
     return;
   }
91adf830
   m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
86badd32
 }
 
91adf830
 static void msend_req_send_close_init(int s, mfile *m)
86badd32
 {
314da278
   m->sendwait  = 1;
   m->initstate = 0;
91adf830
   ack_clear(m, MAKUO_RECVSTATE_UPDATE);
5fe10f4f
   ack_clear(m, MAKUO_RECVSTATE_OPEN);
   ack_clear(m, MAKUO_RECVSTATE_MARK);
86badd32
   msend_packet(s, &(m->mdata), &(m->addr));
 }
 
91adf830
 static void msend_req_send_close(int s, mfile *m)
86badd32
 {
314da278
   if(m->initstate){
91adf830
     msend_req_send_close_init(s, m);
314da278
     return;
   }
86badd32
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
314da278
     return;
86badd32
   }
5fe10f4f
   if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
     msend_req_send_close_init(s, m);
     return;
   }
5f0d41cd
   if(ack_check(m, MAKUO_RECVSTATE_OPEN) == 1){
5fe10f4f
     msend_req_send_close_init(s, m);
5f0d41cd
     return;
   }
5fe10f4f
   if(ack_check(m, MAKUO_RECVSTATE_MARK) == 1){
     msend_req_send_close_init(s, m);
5f0d41cd
     return;
   }
7284b461
   if(m->mdata.head.ostate == MAKUO_SENDSTATE_MARK || 
      m->mdata.head.ostate == MAKUO_SENDSTATE_DATA ||
      m->mdata.head.ostate == MAKUO_SENDSTATE_OPEN){
05348f1d
     lprintf(4, "update complate %s\n", m->fn);
7284b461
   }
dfd4c771
   m->initstate = 1;
20163241
   m->mdata.head.nstate = MAKUO_SENDSTATE_LAST;
91adf830
 }
 
dfd4c771
 static void msend_req_send_last_init(int s, mfile *m)
91adf830
 {
dfd4c771
   m->sendwait  = 1;
   m->initstate = 0;
7ec57664
   ack_clear(m, -1);
91adf830
   msend_packet(s, &(m->mdata), &(m->addr));
dfd4c771
 }
 
 static void msend_req_send_last(int s, mfile *m)
 {
   if(m->initstate){
     msend_req_send_last_init(s, m);
     return;
   }
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
fc430629
   if(ack_check(m, MAKUO_RECVSTATE_SKIP) == 1){
     msend_req_send_last_init(s, m);
     return;
   }
   if(ack_check(m, MAKUO_RECVSTATE_CLOSE) == 1){
     msend_req_send_last_init(s, m);
     return;
   }
20163241
   msend_mfdel(m);
86badd32
 }
 
8f9aeac1
 /*----- send -----*/
 static void msend_req_send(int s, mfile *m)
86badd32
 {
91adf830
   if(!m->comm){
dfd4c771
     if(m->mdata.head.nstate != MAKUO_SENDSTATE_BREAK){
       m->initstate = 1;
       m->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
     }
91adf830
   }
   switch(m->mdata.head.nstate){
     case MAKUO_SENDSTATE_STAT:
       msend_req_send_stat(s, m);
       break;
     case MAKUO_SENDSTATE_OPEN:
       msend_req_send_open(s, m);
       break;
     case MAKUO_SENDSTATE_DATA:
       msend_req_send_data(s, m);
       break;
     case MAKUO_SENDSTATE_MARK:
       msend_req_send_mark(s, m);
       break;
     case MAKUO_SENDSTATE_CLOSE:
       msend_req_send_close(s, m);
       break;
     case MAKUO_SENDSTATE_LAST:
       msend_req_send_last(s, m);
       break;
     case MAKUO_SENDSTATE_BREAK:
       msend_req_send_break(s, m);
       break;
86badd32
   }
 }
 
8f9aeac1
 static void msend_req_md5_open_init(int s, mfile *m)
86badd32
 {
21e2082d
   int e;
   int r;
   mhash *h;
   char buff[8192];
 
   h = (mhash *)m->mdata.data;
   r = read(m->fd, buff, sizeof(buff));
   if(r > 0){
     MD5_Update(&(m->md5), buff, r);
     return;
   }
   e = errno;
   if(r == -1){
     if(e == EINTR){
       return;
     }
 	  lprintf(0, "[error] %s: file read error(%s) %s\n", __func__, strerror(e), m->fn);
     cprintf(0, m->comm, "error: file read error(%s) %s\n", strerror(e), m->fn);
     MD5_Final(h->hash, &(m->md5));
     close(m->fd);
     m->fd = -1;
     msend_mfdel(m);
     return;
   }
   MD5_Final(h->hash, &(m->md5));
   close(m->fd);
   m->fd = -1;
314da278
   m->sendwait  = 1;
   m->initstate = 0;
86badd32
   ack_clear(m, -1);
   m->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
   msend_packet(s, &(m->mdata), &(m->addr));
 }
 
8f9aeac1
 static void msend_req_md5_open(int s, mfile *m)
86badd32
 {
314da278
   if(m->initstate){
8f9aeac1
     msend_req_md5_open_init(s, m);
314da278
     return;
   }
86badd32
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
314da278
     return;
86badd32
   }
21e2082d
   if(ack_check(m, MAKUO_RECVSTATE_OPEN) == 1){
     m->initstate = 0;
     m->sendwait  = 1;
     ack_clear(m, MAKUO_RECVSTATE_OPEN);
     return;
   }
314da278
   m->initstate = 1;
21e2082d
   m->sendwait  = 0;
314da278
   m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
86badd32
 }
 
8f9aeac1
 static void msend_req_md5_close_init(int s, mfile *m)
86badd32
 {
314da278
   m->sendwait  = 1;
   m->initstate = 0;
86badd32
   ack_clear(m, MAKUO_RECVSTATE_MD5OK);
   ack_clear(m, MAKUO_RECVSTATE_MD5NG);
   m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
   msend_packet(s, &(m->mdata), &(m->addr));
 }
 
8f9aeac1
 static void msend_req_md5_close(int s, mfile *m)
86badd32
 {
314da278
   if(m->initstate){
8f9aeac1
     msend_req_md5_close_init(s, m);
314da278
     return;
   }
86badd32
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
314da278
     return;
86badd32
   }
314da278
   msend_mfdel(m);
86badd32
 }
 
8f9aeac1
 /*----- md5 -----*/
 static void msend_req_md5(int s, mfile *m)
86badd32
 {
21e2082d
   if(!m->comm){
     if(m->mdata.head.nstate == MAKUO_SENDSTATE_OPEN){
       if(m->initstate){
         MD5_Final(m->mdata.data, &(m->md5));
         close(m->fd);
         m->fd = -1;
         msend_mfdel(m);
         return;
       }else{
         m->initstate = 1;
         m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
       }
     }
   }
86badd32
   switch(m->mdata.head.nstate){
     case MAKUO_SENDSTATE_OPEN:
8f9aeac1
       msend_req_md5_open(s, m);
86badd32
       break;
     case MAKUO_SENDSTATE_CLOSE:
8f9aeac1
       msend_req_md5_close(s, m);
86badd32
       break;
   }
 }
 
3eaafa57
 static void msend_req_dsync_open(int s, mfile *m)
 {
   if(m->initstate){
e42e0b01
     m->initstate = 0;
8e09e847
     m->sendwait  = 1;
e42e0b01
     msend_packet(s, &(m->mdata), &(m->addr));
3eaafa57
     return;
   }
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
   m->initstate = 1;
8e09e847
   m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
 }
 
 static void msend_req_dsync_data_init(int s, mfile *m)
 {
   uint16_t len;
   int excludecnt = 0;
   excludeitem *e = NULL;
 
   m->sendwait  = 1;
   m->initstate = 0;
   for(e=m->comm->exclude;e;e=e->next){
     if(excludecnt == m->mdata.head.seqno){
       break;
     }
     excludecnt++;
   }
   m->mdata.head.szdata = 0;
   while(e){
     len = strlen(e->pattern);
     if(m->mdata.head.szdata + sizeof(uint16_t) + len > MAKUO_BUFFER_SIZE){
       break;
     }
     data_safeset16(&(m->mdata), len);
     data_safeset(&(m->mdata), e->pattern, len);
     m->mdata.head.seqno++;
     e = e->next;
   }
   if(m->mdata.head.szdata == 0){
     m->mdata.head.seqno++;
   }
 }
 
 static void msend_req_dsync_data(int s, mfile *m)
 {
   if(m->initstate){
     msend_req_dsync_data_init(s, m);
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
   m->initstate = 1;
   if(m->mdata.head.szdata == 0){
     m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
   }
3eaafa57
 }
 
 static void msend_req_dsync_close(int s, mfile *m)
 {
   if(m->initstate){
e42e0b01
     m->sendwait  = 1;
     m->initstate = 0;
     ack_clear(m, MAKUO_RECVSTATE_OPEN);
69d91074
     msend_packet(s, &(m->mdata), &(m->addr));
3eaafa57
     return;
   }
   if(m->sendwait){
69d91074
     msend_packet(s, &(m->mdata), &(m->addr));
3eaafa57
     return;
   }
d38d0833
   if(ack_check(m, MAKUO_RECVSTATE_OPEN)){
69d91074
     m->sendwait  = 0;
     m->initstate = 1;
d38d0833
   }else{
     msend_packet(s, &(m->mdata), &(m->addr));
     msend_mfdel(m);
69d91074
   }
3eaafa57
 }
 
806dbe3b
 static void msend_req_dsync_break(int s, mfile *m)
 {
   if(m->initstate){
e42e0b01
     m->initstate = 0;
442bde46
     m->sendwait  = 1;
e42e0b01
     ack_clear(m, -1);
     msend_packet(s, &(m->mdata), &(m->addr));
806dbe3b
     return;
   }
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
   msend_mfdel(m);
 }
 
3eaafa57
 /*----- dsync -----*/
 static void msend_req_dsync(int s, mfile *m)
 {
facf6c53
   if(!m->comm){
     if(m->mdata.head.nstate != MAKUO_SENDSTATE_BREAK){
       m->initstate = 1;
       m->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
806dbe3b
     }
   }
3eaafa57
   switch(m->mdata.head.nstate){
     case MAKUO_SENDSTATE_OPEN:
       msend_req_dsync_open(s, m);
       break;
8e09e847
     case MAKUO_SENDSTATE_DATA:
       msend_req_dsync_data(s, m);
       break;
3eaafa57
     case MAKUO_SENDSTATE_CLOSE:
       msend_req_dsync_close(s, m);
       break;
806dbe3b
     case MAKUO_SENDSTATE_BREAK:
       msend_req_dsync_break(s, m);
       break;
874955ec
   }
 }
 
31e550c9
 static int msend_req_del_stat_read_pathcmp(int s, mfile *m)
874955ec
 {
31e550c9
   char *p1;
   char *p2;
17b77ef8
   mfile *a;
3b736b7c
   for(a=mftop[MFRECV];a;a=a->next){
31e550c9
     if(a->mdata.head.opcode == MAKUO_OP_SEND){
       p1 = a->tn;
       p2 = m->tn;
       while((strlen(p1) > 1) && (memcmp(p1, "./" ,2) == 0)){
         p1 += 2;
       }
       while((strlen(p2) > 1) && (memcmp(p2, "./" ,2) == 0)){
         p2 += 2;
       }
       if(!strcmp(p1,p2)){
         return(1);
       }
     }
   }
   return(0);
 }
 
 static void msend_req_del_stat_read(int s, mfile *m)
 {
eeef442d
   int r;
13671329
   mfile *d;
881df679
   uint16_t len;
aa2b6bc8
 
eeef442d
   for(d=mftop[MFSEND];d;d=d->next){
     if(d->link == m){
       if(d->mdata.head.nstate == MAKUO_SENDSTATE_WAIT){
         break;
       }
     }
   }
   if(!d){
     d = mkreq(&(m->mdata), &(m->addr), MAKUO_SENDSTATE_WAIT);
     d->mdata.head.flags = m->mdata.head.flags;
     d->mdata.head.reqid = getrid();
     d->initstate = 1;
     d->sendwait  = 0;
     d->sendto    = 1;
     d->dryrun    = m->dryrun;
     d->recurs    = m->recurs;
     d->link      = m;
     d->mdata.p   = d->mdata.data;
   }
686a33f7
 
31e550c9
   if(m->len >= sizeof(m->mod)){
881df679
     len = m->len - sizeof(m->mod);
     data_safeset16(&(d->mdata), m->len);
     data_safeset32(&(d->mdata), m->mod);
     data_safeset(&(d->mdata), m->tn, len);
     m->len = 0;
686a33f7
   }
 
   while(1){
1b4c64d4
     if((r = atomic_read(m->pipe, &(m->len), sizeof(m->len), 1))){
eeef442d
       if(r == -1){
         if(errno == EAGAIN){
           return;
         }else{
           lprintf(0, "[error] %s: length read error\n", __func__);
         }
       }
686a33f7
       break;
     }
881df679
     if(m->len <= sizeof(m->mod)){
       lprintf(0, "[error] %s: length error\n", __func__);
686a33f7
       break;
     }
881df679
     len = m->len - sizeof(m->mod);
eeef442d
     if(atomic_read(m->pipe, &(m->mod), sizeof(m->mod), 0)){
31e550c9
       lprintf(0, "[error] %s: filemode read error\n", __func__);
881df679
       break;
     }
eeef442d
     if(atomic_read(m->pipe, m->tn, len, 0)){
31e550c9
       lprintf(0, "[error] %s: filename read error\n", __func__);
881df679
       break;
     }
     m->tn[len] = 0;
     if(lstat(m->tn, &(m->fs)) == -1){
       if(errno == ENOENT){
         m->len = 0;
         continue;
       }
     }
31e550c9
     if(msend_req_del_stat_read_pathcmp(s, m)){
881df679
       m->len = 0;
442bde46
       continue;
     }
881df679
     if(d->mdata.head.szdata + sizeof(m->len) + m->len > MAKUO_BUFFER_SIZE){
eeef442d
       d->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
     }else{
       strcpy(d->fn, m->tn);
       data_safeset16(&(d->mdata), m->len);
       data_safeset32(&(d->mdata), m->mod);
       data_safeset(&(d->mdata), m->tn, len);
       m->len = 0;
686a33f7
     }
eeef442d
     return;
881df679
   }
eeef442d
   d->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
881df679
   close(m->pipe);
   m->pipe      = -1;
   m->initstate =  1;
   m->sendwait  =  0;
 }
 
31e550c9
 static int msend_req_del_stat_waitcheck(int s, mfile *m)
881df679
 {
   mfile *a;
31e550c9
   for(a=mftop[0];a;a=a->next){
     if((a->mdata.head.opcode == MAKUO_OP_DEL) && (a->link == m)){
       return(1);
     }
   }
   return(0);
 }
 
 static void msend_req_del_stat(int s, mfile *m)
 {
04533bf0
   if(m->pipe != -1){
     msend_req_del_stat_read(s, m);
dea71227
     return;
   }
   if(msend_req_del_stat_waitcheck(s, m)){
     m->sendwait = 1;
     return;
   }
   if(m->link){
     msend(mkack(&(m->link->mdata), &(m->link->addr), MAKUO_RECVSTATE_CLOSE)); /* send ack for dsync */
   }
   if(waitpid(m->pid, NULL, WNOHANG) != m->pid){
     m->sendwait = 1;
04533bf0
   }else{
dea71227
     m->pid = 0;
     msend_mfdel(m);
686a33f7
   }
e42e0b01
 }
f31a5410
 
e42e0b01
 static void msend_req_del_break(int s, mfile *m)
 {
eeef442d
   mfile *d;
   for(d=mftop[MFSEND];d;d=d->next){
     if(d->link == m){
       if(d->mdata.head.nstate == MAKUO_SENDSTATE_WAIT){
dea71227
         d->link = NULL;
eeef442d
         msend_mfdel(d);
         break;
       }
     }
   }
e42e0b01
   msend_mfdel(m);
3b736b7c
   lprintf(0,"%s: break dsync\n", __func__);
e42e0b01
 }
 
 static void msend_req_del_open(int s, mfile *m)
 {
   if(m->initstate){
     m->initstate = 0;
     m->sendwait  = 1;
     ack_clear(m, -1);
     msend_packet(s, &(m->mdata), &(m->addr));
874955ec
     return;
   }
e42e0b01
   if(m->sendwait){
     msend_packet(s, &(m->mdata), &(m->addr));
13671329
     return;
   }
e42e0b01
   m->initstate = 1;
 }
 
 static void msend_req_del_data(int s, mfile *m)
 {
874955ec
   if(m->initstate){
1a2a228f
     m->initstate = 0;
     m->sendwait  = 1;
     ack_clear(m, -1);
     msend_packet(s, &(m->mdata), &(m->addr));
874955ec
     return;
   }
   if(m->sendwait){
e42e0b01
     msend_packet(s, &(m->mdata), &(m->addr));
874955ec
     return;
   }
   m->initstate = 1;
e42e0b01
   m->sendwait  = 0;
   m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
 }
 
 static void msend_req_del_close(int s, mfile *m)
 {
8ec3f92b
   lprintf(0, "%s: 0 rid=%d\n", __func__, m->mdata.head.reqid);
e42e0b01
   if(m->initstate){
8ec3f92b
     lprintf(0, "%s: 1 rid=%d\n", __func__, m->mdata.head.reqid);
e42e0b01
     m->initstate = 0;
     m->sendwait  = 1;
     ack_clear(m, -1);
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
   if(m->sendwait){
8ec3f92b
     lprintf(0, "%s: 2 rid=%d\n", __func__, m->mdata.head.reqid);
e42e0b01
     msend_packet(s, &(m->mdata), &(m->addr));
     return;
   }
04533bf0
   if(m->link){
8ec3f92b
     lprintf(0, "%s: 3 rid=%d\n", __func__, m->mdata.head.reqid);
04533bf0
     m->link->sendwait = 0;
   }
3b736b7c
   m->link = NULL;
e42e0b01
   msend_mfdel(m);
8ec3f92b
   lprintf(0, "%s: 4 rid=%d\n", __func__, m->mdata.head.reqid);
e42e0b01
 }
 
 /*----- del -----*/
 static void msend_req_del(int s, mfile *m)
 {
   switch(m->mdata.head.nstate){
     case MAKUO_SENDSTATE_STAT:
       msend_req_del_stat(s, m);
       break;
     case MAKUO_SENDSTATE_BREAK:
       msend_req_del_break(s, m);
       break;
     case MAKUO_SENDSTATE_OPEN:
       msend_req_del_open(s, m);
       break;
     case MAKUO_SENDSTATE_DATA:
       msend_req_del_data(s, m);
       break;
     case MAKUO_SENDSTATE_CLOSE:
       msend_req_del_close(s, m);
       break;
   }
3eaafa57
 }
 
8f9aeac1
 /*----- exit -----*/
 static void msend_req_exit(int s, mfile *m)
86badd32
 {
8f9aeac1
   msend_shot(s, m);
86badd32
 }
 
8f9aeac1
 /*----- ping -----*/
 static void msend_req_ping(int s, mfile *m)
86badd32
 {
8f9aeac1
   msend_shot(s, m);
 }
 
 /*----- send request -----*/
 static void msend_req(int s, mfile *m)
 {
   switch(m->mdata.head.opcode){
     case MAKUO_OP_PING:
       msend_req_ping(s, m);
       break;
     case MAKUO_OP_EXIT:
       msend_req_exit(s, m);
       break;
     case MAKUO_OP_SEND:
       msend_req_send(s, m);
       break;
     case MAKUO_OP_MD5:
       msend_req_md5(s, m);
       break;
3eaafa57
     case MAKUO_OP_DSYNC:
       msend_req_dsync(s, m);
       break;
874955ec
     case MAKUO_OP_DEL:
       msend_req_del(s, m);
       break;
8f9aeac1
     /* 機能追加はここ */
86badd32
   }
 }
8f9aeac1
 
825350fc
 /******************************************************************
 *
 * send common functions (public)
 *
 *******************************************************************/
d6e8005b
 void msend(mfile *m)
825350fc
 {
facf6c53
   if(!m){
     return;
   }
825350fc
   if(m->mdata.head.flags & MAKUO_FLAG_ACK){
d6e8005b
     msend_ack(moption.mcsocket, m); 
825350fc
   }else{
eeef442d
     if(!msend_retry(m)){
       msend_req(moption.mcsocket, m); 
       mtimeget(&m->lastsend);
     }
825350fc
   }
 }
36959136
 
27b2005f
 void msend_clean()
 {
eeef442d
   mfile *m = mftop[MFSEND];
1b4c64d4
   while((m = msend_mfdel(m)));
27b2005f
 }