Masanobu Yasui authored on 2008/12/24 08:27:12
Showing 6 changed files
... ...
@@ -1104,7 +1104,9 @@ mfile *mkack(mdata *data, struct sockaddr_in *addr, uint8_t state)
1104 1104
     a->mdata.head.opcode = data->head.opcode;
1105 1105
     a->mdata.head.reqid  = data->head.reqid;
1106 1106
     a->mdata.head.seqno  = data->head.seqno;
1107
+    a->mdata.head.ostate = data->head.ostate;
1107 1108
     a->mdata.head.nstate = state;
1109
+    a->mdata.head.error  = data->head.error;
1108 1110
     memcpy(&(a->addr), addr, sizeof(a->addr));
1109 1111
   }
1110 1112
   return(a);
... ...
@@ -248,14 +248,19 @@ int mloop()
248 248
   lastpong = pingpong(0);
249 249
   while(loop_flag){
250 250
     gettimeofday(&curtime, NULL);
251
-    if(mtimeout(lastpong, MAKUO_PONG_INTERVAL))
251
+    if(mtimeout(lastpong, MAKUO_PONG_INTERVAL)){
252 252
       lastpong = pingpong(1);
253
+    }
254
+
255
+    /* udp packet receive */
253 256
     m = mftop[0];
254 257
     while(mrecv(moption.mcsocket)){
255 258
       if(m != mftop[0]){
256 259
         break;
257 260
       }
258 261
     }
262
+
263
+    /* udp packet send */
259 264
     para = 0;
260 265
     m = mftop[0];
261 266
     while(m){
... ...
@@ -266,20 +271,21 @@ int mloop()
266 266
         break;
267 267
       }
268 268
     }
269
+
270
+    /* command read */
269 271
     mcomm_check(moption.comm);
270 272
 
273
+    /* wait */
271 274
     FD_ZERO(&rfds);
272 275
     FD_ZERO(&wfds);
273 276
     FD_SET(moption.mcsocket,  &rfds);
274 277
     mcomm_fdset(moption.comm, &rfds);
275
-
276 278
     for(m=mftop[0];m;m=m->next){
277 279
       if(ismsend(moption.mcsocket, m, 0)){
278 280
         FD_SET(moption.mcsocket, &wfds);
279 281
         break;
280 282
       }
281 283
     }
282
-
283 284
     tv.tv_sec  = 1;
284 285
     tv.tv_usec = 0;
285 286
     if(select(1024, &rfds, &wfds, NULL, &tv) == -1)
... ...
@@ -72,10 +72,9 @@
72 72
 /*----- flags -----*/
73 73
 #define MAKUO_FLAG_ACK    1
74 74
 #define MAKUO_FLAG_CRYPT  2
75
-#define MAKUO_FLAG_FMARK  4
76
-#define MAKUO_FLAG_DRYRUN 8
77
-#define MAKUO_FLAG_RECURS 16
78
-#define MAKUO_FLAG_SYNC   32
75
+#define MAKUO_FLAG_DRYRUN 4
76
+#define MAKUO_FLAG_RECURS 8
77
+#define MAKUO_FLAG_SYNC   16
79 78
 
80 79
 /*----- sendstatus -----*/
81 80
 #define MAKUO_SENDSTATE_STAT       0  /* 更新確認待 */
... ...
@@ -97,6 +96,7 @@
97 97
 #define MAKUO_RECVSTATE_IGNORE     6
98 98
 #define MAKUO_RECVSTATE_READONLY   7
99 99
 #define MAKUO_RECVSTATE_BREAK      8
100
+#define MAKUO_RECVSTATE_LAST       9
100 101
 #define MAKUO_RECVSTATE_MD5OK      10
101 102
 #define MAKUO_RECVSTATE_MD5NG      11
102 103
 #define MAKUO_RECVSTATE_DELETEOK   12
... ...
@@ -345,12 +345,12 @@ int mexec_send(mcomm *c, int n, int sync)
345 345
           }
346 346
         }
347 347
         if(!t){
348
-          cprintf(0, c, "%s is not contained in members\r\n", optarg);
348
+          cprintf(0, c, "%s is not contained in members\n", optarg);
349 349
           return(0);
350 350
         }
351 351
         break;
352 352
       case '?':
353
-        cprintf(0, c, "invalid option -- %c\r\n", optopt);
353
+        cprintf(0, c, "invalid option -- %c\n", optopt);
354 354
         return(0); 
355 355
     }
356 356
   }
... ...
@@ -379,16 +379,16 @@ int mexec_send(mcomm *c, int n, int sync)
379 379
   /*----- help -----*/
380 380
   if(!fn){
381 381
     if(sync){
382
-      cprintf(0, c, "sync [-n] [-r] [-t host] [path]\r\n");
383
-      cprintf(0, c, "  -n  # dryrun\r\n");
384
-      cprintf(0, c, "  -r  # recursive\r\n");
385
-      cprintf(0, c, "  -t  # target host\r\n");
382
+      cprintf(0, c, "sync [-n] [-r] [-t host] [path]\n");
383
+      cprintf(0, c, "  -n  # dryrun\n");
384
+      cprintf(0, c, "  -r  # recursive\n");
385
+      cprintf(0, c, "  -t  # target host\n");
386 386
     }else{
387
-      cprintf(0, c, "send [-n] [-r] [-t host] [path]\r\n");
388
-      cprintf(0, c, "  -n  # dryrun\r\n");
389
-      cprintf(0, c, "  -r  # recursive\r\n");
390
-      cprintf(0, c, "  -D  # with delete\r\n");
391
-      cprintf(0, c, "  -t  # target host\r\n");
387
+      cprintf(0, c, "send [-n] [-r] [-t host] [path]\n");
388
+      cprintf(0, c, "  -n  # dryrun\n");
389
+      cprintf(0, c, "  -r  # recursive\n");
390
+      cprintf(0, c, "  -D  # with delete\n");
391
+      cprintf(0, c, "  -t  # target host\n");
392 392
     }
393 393
     return(0);
394 394
   }
... ...
@@ -515,12 +515,12 @@ int mexec_check(mcomm *c, int n)
515 515
           if(!strcmp(t->hostname, optarg))
516 516
             break;
517 517
         if(!t){
518
-          cprintf(0, c, "%s is not contained in members\r\n", optarg);
518
+          cprintf(0, c, "%s is not contained in members\n", optarg);
519 519
           return(0);
520 520
         }
521 521
         break;
522 522
       case '?':
523
-        cprintf(0, c, "invalid option -- %c\r\n", optopt);
523
+        cprintf(0, c, "invalid option -- %c\n", optopt);
524 524
         return(0); 
525 525
     }
526 526
   }
... ...
@@ -539,9 +539,9 @@ int mexec_check(mcomm *c, int n)
539 539
 
540 540
   /*----- help -----*/
541 541
   if(!fn){
542
-    cprintf(0, c,"usage: check [-t host] [-r] [path]\r\n");
543
-    cprintf(0, c, "  -r  # dir recursive\r\n");
544
-    cprintf(0, c, "  -t  # target host\r\n");
542
+    cprintf(0, c,"usage: check [-t host] [-r] [path]\n");
543
+    cprintf(0, c, "  -r  # dir recursive\n");
544
+    cprintf(0, c, "  -t  # target host\n");
545 545
     return(0);
546 546
   }
547 547
 
... ...
@@ -566,8 +566,8 @@ int mexec_check(mcomm *c, int n)
566 566
   /*----- open -----*/
567 567
   m->fd = open(m->fn, O_RDONLY);
568 568
   if(m->fd == -1){
569
-	  lprintf(0, "%s: file open error %s\n", __func__, m->fn);
570
-    cprintf(0, c, "file open error: %s\r\n", m->fn);
569
+	  lprintf(0, "%s: file open error (%s) %s\n", __func__, strerror(errno), m->fn);
570
+    cprintf(0, c, "error: file open error (%s) %s\n", strerror(errno), m->fn);
571 571
     mfdel(m);
572 572
     return(0);
573 573
   }
... ...
@@ -579,8 +579,8 @@ int mexec_check(mcomm *c, int n)
579 579
   close(m->fd);
580 580
   m->fd = -1;
581 581
   if(r == -1){
582
-	  lprintf(0, "%s: file read error %s\n", __func__, m->fn);
583
-    cprintf(0, c, "error: file read error %s\n", m->fn);
582
+	  lprintf(0, "%s: file read error (%s) %s\n", __func__, strerror(errno), m->fn);
583
+    cprintf(0, c, "error: file read error (%s) %s\n", strerror(errno), m->fn);
584 584
     mfdel(m);
585 585
     return(0);
586 586
   }
... ...
@@ -627,12 +627,12 @@ int mexec_dsync(mcomm *c, int n)
627 627
           if(!strcmp(t->hostname, optarg))
628 628
             break;
629 629
         if(!t){
630
-          cprintf(0, c, "%s is not contained in members\r\n", optarg);
630
+          cprintf(0, c, "%s is not contained in members\n", optarg);
631 631
           return(0);
632 632
         }
633 633
         break;
634 634
       case '?':
635
-        cprintf(0, c, "invalid option -- %c\r\n", optopt);
635
+        cprintf(0, c, "invalid option -- %c\n", optopt);
636 636
         return(0); 
637 637
     }
638 638
   }
... ...
@@ -652,10 +652,10 @@ int mexec_dsync(mcomm *c, int n)
652 652
 
653 653
   /*----- help -----*/
654 654
   if(c->argc[n]<2){
655
-    cprintf(0, c, "dsync [-r] [-t host] [-n] [path]\r\n");
656
-    cprintf(0, c, "  -r  # recursive\r\n");
657
-    cprintf(0, c, "  -t  # target host\r\n");
658
-    cprintf(0, c, "  -n  # dryrun\r\n");
655
+    cprintf(0, c, "dsync [-r] [-t host] [-n] [path]\n");
656
+    cprintf(0, c, "  -r  # recursive\n");
657
+    cprintf(0, c, "  -t  # target host\n");
658
+    cprintf(0, c, "  -n  # dryrun\n");
659 659
     return(0);
660 660
   }
661 661
 
... ...
@@ -22,7 +22,7 @@ static mfile *mrecv_mfdel(mfile *m)
22 22
   if(m->fd != -1){
23 23
     close(m->fd);
24 24
     m->fd = -1;
25
-    if(S_ISREG(m->fs.st_mode)){
25
+    if(!S_ISLNK(m->fs.st_mode) && S_ISREG(m->fs.st_mode)){
26 26
       mremove(moption.base_dir, m->tn);
27 27
     }
28 28
   }
... ...
@@ -69,11 +69,12 @@ static int mrecv_packet(int s, mdata *data, struct sockaddr_in *addr)
69 69
   socklen_t addr_len;
70 70
 
71 71
   while(1){
72
+    if(!loop_flag){
73
+      return(-1);
74
+    }
72 75
     addr_len = sizeof(struct sockaddr_in);
73 76
     recvsize = recvfrom(s, data, sizeof(mdata), 0, (struct sockaddr *)addr, &addr_len);
74
-    if(recvsize != -1){
75
-      break;
76
-    }else{
77
+    if(recvsize == -1){
77 78
       if(errno == EAGAIN){
78 79
         return(-1);
79 80
       }
... ...
@@ -84,29 +85,31 @@ static int mrecv_packet(int s, mdata *data, struct sockaddr_in *addr)
84 84
         return(-1);
85 85
       }
86 86
     }
87
+    if(recvsize < sizeof(data->head)){
88
+      lprintf(0, "%s: recv head size error\n", __func__);
89
+      return(-1);
90
+    }
91
+    data->head.szdata = ntohs(data->head.szdata);
92
+    data->head.flags  = ntohs(data->head.flags);
93
+    data->head.reqid  = ntohl(data->head.reqid);
94
+    data->head.seqno  = ntohl(data->head.seqno);
95
+    data->head.maddr  = ntohl(data->head.maddr);
96
+    data->head.mport  = ntohs(data->head.mport);
97
+    data->head.error  = ntohl(data->head.error);
98
+    if(data->head.maddr != moption.maddr.sin_addr.s_addr){
99
+      continue; /* other group packet */
100
+    }
101
+    if(data->head.mport != moption.maddr.sin_port){
102
+      continue; /* other group packet */
103
+    }
104
+    if(data->head.vproto != PROTOCOL_VERSION){
105
+      continue; /* other protocol */
106
+    }
107
+    if(!mrecv_decrypt(data, addr)){
108
+      break;
109
+    }
87 110
   }
88
-  if(recvsize < sizeof(data->head)){
89
-    lprintf(0, "%s: recv head size error\n", __func__);
90
-    return(-1);
91
-  }
92
-
93
-  data->head.szdata = ntohs(data->head.szdata);
94
-  data->head.flags  = ntohs(data->head.flags);
95
-  data->head.reqid  = ntohl(data->head.reqid);
96
-  data->head.seqno  = ntohl(data->head.seqno);
97
-  data->head.maddr  = ntohl(data->head.maddr);
98
-  data->head.mport  = ntohs(data->head.mport);
99
-  data->head.error  = ntohl(data->head.error);
100
-  if(data->head.maddr != moption.maddr.sin_addr.s_addr){
101
-    return(-1);
102
-  }
103
-  if(data->head.mport != moption.maddr.sin_port){
104
-    return(-1);
105
-  }
106
-  if(data->head.vproto != PROTOCOL_VERSION){
107
-    return(-1);
108
-  }
109
-  return(mrecv_decrypt(data, addr));
111
+  return(0);
110 112
 }
111 113
 
112 114
 /******************************************************************
... ...
@@ -247,8 +250,6 @@ static void mrecv_ack_dsync(mdata *data, struct sockaddr_in *addr)
247 247
   mhost *t;
248 248
   mfile *m;
249 249
 
250
-  lprintf(9, "%s: rid=%d %s\n", __func__, data->head.reqid, strmstate(data));
251
-
252 250
   if(data->head.nstate == MAKUO_RECVSTATE_CLOSE){
253 251
     mkreq(data, addr, MAKUO_SENDSTATE_LAST);
254 252
   }
... ...
@@ -300,7 +301,6 @@ static void mrecv_ack_del(mdata *data, struct sockaddr_in *addr)
300 300
   uint16_t len;
301 301
   char path[PATH_MAX];
302 302
 
303
-  lprintf(9, "%s: rid=%d %s\n", __func__, data->head.reqid, strmstate(data));
304 303
   if(mrecv_ack_search(&t, &m, data, addr)){
305 304
     return;
306 305
   }
... ...
@@ -525,19 +525,10 @@ static void mrecv_req_send_open(mfile *m, mdata *r)
525 525
       }
526 526
     }
527 527
   }
528
-  mfile *a = mfins(0);
529
-  if(!a){
528
+  if(!mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)){
530 529
     lprintf(0, "%s: out of momory\n", __func__);
531 530
     return;
532 531
   }
533
-  a->mdata.head.flags |= MAKUO_FLAG_ACK;
534
-  a->mdata.head.opcode = r->head.opcode;
535
-  a->mdata.head.reqid  = r->head.reqid;
536
-  a->mdata.head.seqno  = r->head.seqno;
537
-  a->mdata.head.ostate = m->mdata.head.ostate;
538
-  a->mdata.head.nstate = m->mdata.head.nstate;
539
-  a->mdata.head.error  = m->mdata.head.error;
540
-  memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
541 532
 }
542 533
 
543 534
 static void mrecv_req_send_mark(mfile *m, mdata *r)
... ...
@@ -548,68 +539,35 @@ static void mrecv_req_send_mark(mfile *m, mdata *r)
548 548
   if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){
549 549
     return;
550 550
   }
551
-
552
-  for(a=mftop[0];a;a=a->next){
553
-    if(a->mdata.head.flags & MAKUO_FLAG_ACK){
554
-      if(a->mdata.head.reqid != r->head.reqid){
555
-        continue;
556
-      }
557
-      if(a->mdata.head.opcode != r->head.opcode){
558
-        continue;
559
-      }
560
-      if(a->mdata.head.nstate != MAKUO_RECVSTATE_MARK){
561
-        continue;
562
-      }
563
-      if(memcmp(&(a->addr), &(m->addr), sizeof(a->addr))){
564
-        continue;
565
-      }
566
-      return;
567
-    }
568
-  }
569
-
570
-  a = mfins(0);
571
-  a->mdata.head.flags |= MAKUO_FLAG_ACK;
572
-  a->mdata.head.flags |= MAKUO_FLAG_FMARK;
573
-  a->mdata.head.opcode = r->head.opcode;
574
-  a->mdata.head.reqid  = r->head.reqid;
575
-  a->mdata.head.seqno  = r->head.seqno;
576
-  a->mdata.head.ostate = m->mdata.head.nstate;
577
-  a->mdata.head.nstate = MAKUO_RECVSTATE_MARK;
578
-  a->mdata.head.szdata = 0;
579
-  memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
580 551
   if(m->mdata.head.seqno < m->seqnomax){
581 552
     seq_addmark(m, m->mdata.head.seqno, m->seqnomax);
582 553
     m->mdata.head.seqno = m->seqnomax;
583 554
   }
584 555
   m->lickflag = 1;
585
-  
556
+  a = mkack(&(m->mdata),&(m->addr),MAKUO_RECVSTATE_MARK);
557
+  if(!a){
558
+    lprintf(0, "%s: out of momory\n", __func__);
559
+    return;
560
+  }
561
+  if(a->mdata.head.szdata){
562
+    return;
563
+  }
586 564
   for(mm=m->mark;mm;mm=mm->next){
587 565
     if(data_safeset32(&(a->mdata), mm->l)){
588 566
       break;
589 567
     }
590 568
     if(data_safeset32(&(a->mdata), mm->h)){
591 569
       a->mdata.head.szdata -= sizeof(uint32_t);
570
+      break;
592 571
     }
593 572
   }
594 573
 }
595 574
 
596 575
 static void mrecv_req_send_data_write_error(mfile *m, mdata *r)
597 576
 {
598
-  mfile *a = mfins(0);
599
-  if(!a){
577
+  if(!mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)){
600 578
     lprintf(0, "%s: out of momory\n", __func__);
601
-    return;
602 579
   }
603
-
604
-  /*----- write error notlfy -----*/
605
-  a->mdata.head.flags |= MAKUO_FLAG_ACK;
606
-  a->mdata.head.opcode = r->head.opcode;
607
-  a->mdata.head.reqid  = r->head.reqid;
608
-  a->mdata.head.seqno  = r->head.seqno;
609
-  a->mdata.head.ostate = m->mdata.head.ostate;
610
-  a->mdata.head.nstate = m->mdata.head.nstate;
611
-  a->mdata.head.szdata = 0;
612
-  memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
613 580
 }
614 581
 
615 582
 static void mrecv_req_send_data_write(mfile *m, mdata *r)
... ...
@@ -644,23 +602,15 @@ static void mrecv_req_send_data_write(mfile *m, mdata *r)
644 644
 
645 645
 static void mrecv_req_send_data_retry(mfile *m, mdata *r)
646 646
 {
647
-  lprintf(3, "%s: markcount=%04u recv=%06u size=%06u %s\n",
648
-    __func__,
649
-    m->markcount, 
650
-    m->recvcount, 
651
-    m->seqnomax,  
652
-    m->fn);
653
-
654 647
   mmark *mm;
655
-  mfile *a = mfins(0);
656
-  a->mdata.head.flags |= MAKUO_FLAG_ACK;
657
-  a->mdata.head.opcode = r->head.opcode;
658
-  a->mdata.head.reqid  = r->head.reqid;
659
-  a->mdata.head.seqno  = r->head.seqno;
660
-  a->mdata.head.ostate = m->mdata.head.nstate;
661
-  a->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
662
-  a->mdata.head.szdata = 0;
663
-  memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
648
+  mfile *a = mkack(&(m->mdata), &(m->addr), MAKUO_RECVSTATE_OPEN);
649
+  if(!a){
650
+    lprintf(0, "%s: out of momory\n", __func__);
651
+    return;
652
+  }
653
+  if(a->mdata.head.szdata){
654
+    return;
655
+  }
664 656
   data_safeset32(&(a->mdata), m->mdata.head.seqno);
665 657
   data_safeset32(&(a->mdata), r->head.seqno);
666 658
   for(mm=m->mark;mm;mm=mm->next){
... ...
@@ -702,7 +652,6 @@ static void mrecv_req_send_data(mfile *m, mdata *r)
702 702
 
703 703
 static void mrecv_req_send_close(mfile *m, mdata *r)
704 704
 {
705
-  mfile  *a;
706 705
   struct stat fs;
707 706
   struct utimbuf mftime;
708 707
   char  fpath[PATH_MAX];
... ...
@@ -771,23 +720,14 @@ static void mrecv_req_send_close(mfile *m, mdata *r)
771 771
       return;
772 772
   }
773 773
 
774
-  a = mfins(0);
775
-  if(!a){
774
+  if(!mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)){
776 775
     lprintf(0, "%s: out of memory\n", __func__);
777
-  }else{
778
-    a->mdata.head.flags |= MAKUO_FLAG_ACK;
779
-    a->mdata.head.opcode = r->head.opcode;
780
-    a->mdata.head.reqid  = r->head.reqid;
781
-    a->mdata.head.seqno  = r->head.seqno;
782
-    a->mdata.head.ostate = m->mdata.head.ostate;
783
-    a->mdata.head.nstate = m->mdata.head.nstate;
784
-    memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
785 776
   }
786 777
 }
787 778
 
788 779
 static void mrecv_req_send_last(mfile *m, mdata *r)
789 780
 {
790
-  mkack(r, &(m->addr), MAKUO_RECVSTATE_IGNORE);
781
+  mkack(r, &(m->addr), MAKUO_RECVSTATE_LAST);
791 782
   mrecv_mfdel(m);
792 783
 }
793 784
 
... ...
@@ -795,12 +735,10 @@ static void mrecv_req_send_next(mfile *m, mdata *r)
795 795
 {
796 796
   switch(r->head.nstate){
797 797
     case MAKUO_SENDSTATE_STAT:
798
-      lprintf(9,"%s: %s/%s %s\n", __func__, strsstate(r->head.nstate), strrstate(m->mdata.head.nstate), m->fn);
799 798
       mrecv_req_send_stat(m, r);
800 799
       break;
801 800
 
802 801
     case MAKUO_SENDSTATE_OPEN:
803
-      lprintf(9,"%s: %s/%s %s\n", __func__, strsstate(r->head.nstate), strrstate(m->mdata.head.nstate), m->fn);
804 802
       mrecv_req_send_open(m, r);
805 803
       break;
806 804
 
... ...
@@ -809,23 +747,18 @@ static void mrecv_req_send_next(mfile *m, mdata *r)
809 809
       break;
810 810
 
811 811
     case MAKUO_SENDSTATE_MARK:
812
-      lprintf(9,"%s: %s/%s rid=%d seqno=%d max=%d cnt=%d %s\n", __func__,
813
-        strsstate(r->head.nstate), strrstate(m->mdata.head.nstate), m->mdata.head.reqid, m->mdata.head.seqno, m->seqnomax, m->markcount, m->fn);
814 812
       mrecv_req_send_mark(m, r);
815 813
       break;
816 814
 
817 815
     case MAKUO_SENDSTATE_CLOSE:
818
-      lprintf(9,"%s: %s/%s %s\n", __func__, strsstate(r->head.nstate), strrstate(m->mdata.head.nstate), m->fn);
819 816
       mrecv_req_send_close(m, r);
820 817
       break;
821 818
 
822 819
     case MAKUO_SENDSTATE_LAST:
823
-      lprintf(9,"%s: %s/%s %s\n", __func__, strsstate(r->head.nstate), strrstate(m->mdata.head.nstate), m->fn);
824 820
       mrecv_req_send_last(m, r);
825 821
       break;
826 822
 
827 823
     case MAKUO_SENDSTATE_BREAK:
828
-      lprintf(9,"%s: %s/%s %s\n", __func__, strsstate(r->head.nstate), strrstate(m->mdata.head.nstate), m->fn);
829 824
       mrecv_req_send_break(m, r);
830 825
       break;
831 826
   }
... ...
@@ -850,12 +783,12 @@ static mfile *mrecv_req_send_create(mdata *data, struct sockaddr_in *addr)
850 850
     return(NULL);
851 851
   }
852 852
 
853
-  /* copy header */
853
+  /* copy header and addr */
854 854
   memcpy(&(m->addr), addr, sizeof(m->addr));
855 855
   memcpy(&(m->mdata.head), &(data->head), sizeof(m->mdata.head));
856
-  data->p = data->data;
857 856
 
858 857
   /* read mstat */
858
+  data->p = data->data;
859 859
   data_safeget(data, &fs, sizeof(fs));
860 860
 
861 861
   /* stat = mstat */
... ...
@@ -914,11 +847,14 @@ static void mrecv_req_md5_open(mfile *m, mdata *data, struct sockaddr_in *addr)
914 914
 {
915 915
   int    r;
916 916
   int    l;
917
-  mfile *a;
918 917
   mhash *h;
919 918
 
920 919
   if(!m){
921 920
     m = mfadd(1);
921
+    if(!m){
922
+      lprintf(0,"%s: out of memory\n", __func__);
923
+      return;
924
+    }
922 925
     memcpy(&(m->addr), addr, sizeof(m->addr));
923 926
     memcpy(&(m->mdata.head), &(data->head), sizeof(m->mdata.head));
924 927
     h = (mhash *)(data->data);
... ...
@@ -946,27 +882,17 @@ static void mrecv_req_md5_open(mfile *m, mdata *data, struct sockaddr_in *addr)
946 946
       }
947 947
     }
948 948
   }
949
-  a=mfadd(0);
950
-  a->mdata.head.flags |= MAKUO_FLAG_ACK;
951
-  a->mdata.head.opcode = m->mdata.head.opcode;
952
-  a->mdata.head.reqid  = m->mdata.head.reqid;
953
-  a->mdata.head.seqno  = 0;
954
-  a->mdata.head.szdata = 0;
955
-  a->mdata.head.nstate = m->mdata.head.nstate;
956
-  memcpy(&(a->addr), addr, sizeof(a->addr));
957 949
   mtimeget(&(m->lastrecv));
950
+  if(!mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)){
951
+    lprintf(0,"%s: out of memory\n", __func__);
952
+  }
958 953
 }
959 954
 
960 955
 static void mrecv_req_md5_close(mfile *m, mdata *data, struct sockaddr_in *addr)
961 956
 {
962
-  mfile *a = mfadd(0);
963
-  a->mdata.head.flags |= MAKUO_FLAG_ACK;
964
-  a->mdata.head.opcode = data->head.opcode;
965
-  a->mdata.head.reqid  = data->head.reqid;
966
-  a->mdata.head.szdata = 0;
967
-  a->mdata.head.seqno  = 0;
968
-  a->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
969
-  memcpy(&(a->addr), addr, sizeof(a->addr));
957
+  if(!mkack(data, addr, MAKUO_RECVSTATE_CLOSE)){
958
+    lprintf(0,"%s: out of memory\n", __func__);
959
+  }
970 960
   mrecv_mfdel(m);
971 961
 }
972 962
 
... ...
@@ -977,14 +903,7 @@ static void mrecv_req_md5_close(mfile *m, mdata *data, struct sockaddr_in *addr)
977 977
  */
978 978
 static void mrecv_req_md5(mdata *data, struct sockaddr_in *addr)
979 979
 {
980
-  mfile *m = mftop[1];
981
-  while(m){
982
-    if(!memcmp(&m->addr, addr, sizeof(m->addr)) && m->mdata.head.reqid == data->head.reqid){
983
-      mtimeget(&m->lastrecv);
984
-      break;
985
-    }
986
-    m = m->next;
987
-  }
980
+  mfile *m = mrecv_req_search(data, addr);
988 981
   switch(data->head.nstate){
989 982
     case MAKUO_SENDSTATE_OPEN:
990 983
       mrecv_req_md5_open(m, data, addr);
... ...
@@ -1105,7 +1024,6 @@ static void dsync_scan(int fd, char *base, int recurs, excludeitem *e)
1105 1105
 
1106 1106
 static void mrecv_req_dsync_open(mfile *m, mdata *data, struct sockaddr_in *addr)
1107 1107
 {
1108
-  lprintf(9, "%s:\n", __func__);
1109 1108
   mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1110 1109
   if(m){
1111 1110
     return;
... ...
@@ -1130,7 +1048,6 @@ static void mrecv_req_dsync_data(mfile *m, mdata *data, struct sockaddr_in *addr
1130 1130
   char path[PATH_MAX];
1131 1131
   uint16_t len;
1132 1132
 
1133
-  lprintf(9, "%s:\n", __func__);
1134 1133
   mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1135 1134
   if(m->mdata.head.seqno >= data->head.seqno){
1136 1135
     return;
... ...
@@ -1195,7 +1112,6 @@ static void mrecv_req_dsync_data(mfile *m, mdata *data, struct sockaddr_in *addr
1195 1195
 
1196 1196
 static void mrecv_req_dsync_last(mfile *m, mdata *data, struct sockaddr_in *addr)
1197 1197
 {
1198
-  lprintf(9, "%s:\n", __func__);
1199 1198
   if(!m){
1200 1199
     return;
1201 1200
   }
... ...
@@ -1208,7 +1124,6 @@ static void mrecv_req_dsync_last(mfile *m, mdata *data, struct sockaddr_in *addr
1208 1208
 
1209 1209
 static void mrecv_req_dsync_break(mfile *m, mdata *data, struct sockaddr_in *addr)
1210 1210
 {
1211
-  lprintf(9, "%s:\n", __func__);
1212 1211
   if(m){
1213 1212
     if(m->link){
1214 1213
       m->link->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
... ...
@@ -1224,7 +1139,6 @@ static void mrecv_req_dsync_break(mfile *m, mdata *data, struct sockaddr_in *add
1224 1224
  */
1225 1225
 static void mrecv_req_dsync(mdata *data, struct sockaddr_in *addr)
1226 1226
 {
1227
-  lprintf(9, "%s: rid=%d %s %s\n", __func__, data->head.reqid, stropcode(data), strmstate(data));
1228 1227
   mfile *m = mrecv_req_search(data, addr);
1229 1228
   switch(data->head.nstate){
1230 1229
     case MAKUO_SENDSTATE_OPEN:
... ...
@@ -1253,7 +1167,6 @@ static void mrecv_req_del_open(mdata *data, struct sockaddr_in *addr)
1253 1253
   mfile *a = mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1254 1254
   char path[PATH_MAX];
1255 1255
 
1256
-  lprintf(9, "%s: rid=%d\n", __func__, data->head.reqid);
1257 1256
   if(!a){
1258 1257
     lprintf(0, "%s: arror ack can't create\n", __func__);
1259 1258
     return;
... ...
@@ -1264,7 +1177,6 @@ static void mrecv_req_del_open(mdata *data, struct sockaddr_in *addr)
1264 1264
     len -= sizeof(uint32_t);
1265 1265
     data_safeget(data, path, len);
1266 1266
     path[len] =  0;
1267
-    lprintf(9, "%s: rid=%d %s\n", __func__, data->head.reqid, path);
1268 1267
     if(lstat(path, &(a->fs)) == -1 && errno == ENOENT){
1269 1268
       data_safeset16(&(a->mdata), len + sizeof(uint32_t));
1270 1269
       data_safeset32(&(a->mdata), 0);
... ...
@@ -1300,7 +1212,6 @@ static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr)
1300 1300
   mfile *a = mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1301 1301
   char path[PATH_MAX];
1302 1302
 
1303
-  lprintf(9, "%s:\n", __func__);
1304 1303
   if(m){
1305 1304
     return;
1306 1305
   }
... ...
@@ -1333,7 +1244,6 @@ static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr)
1333 1333
 
1334 1334
 static void mrecv_req_del_close(mdata *data, struct sockaddr_in *addr)
1335 1335
 {
1336
-  lprintf(9, "%s:\n", __func__);
1337 1336
   mfile *m = mrecv_req_search(data, addr);
1338 1337
   mfile *a = mkack(data, addr, MAKUO_RECVSTATE_CLOSE);
1339 1338
   if(!m){
... ...
@@ -1344,7 +1254,6 @@ static void mrecv_req_del_close(mdata *data, struct sockaddr_in *addr)
1344 1344
 
1345 1345
 static void mrecv_req_del(mdata *data, struct sockaddr_in *addr)
1346 1346
 {
1347
-  lprintf(9, "%s: rid=%d %s %s\n", __func__, data->head.reqid, stropcode(data),  strsstate(data->head.nstate));
1348 1347
   switch(data->head.nstate){
1349 1348
     case MAKUO_SENDSTATE_OPEN: 
1350 1349
       mrecv_req_del_open(data, addr);
... ...
@@ -1398,12 +1307,10 @@ void mrecv_gc()
1398 1398
   /* file timeout */
1399 1399
   while(m){
1400 1400
     if(mtimeout(&(m->lastrecv), MAKUO_RECV_GCWAIT)){
1401
-      if(MAKUO_RECVSTATE_CLOSE != m->mdata.head.nstate){
1402
-        lprintf(0,"%s: mfile object GC state=%s %s\n",
1403
-          __func__, 
1404
-          strrstate(m->mdata.head.nstate), 
1405
-          m->fn);
1406
-      }
1401
+      lprintf(0,"%s: mfile object GC state=%s %s\n",
1402
+        __func__, 
1403
+        strrstate(m->mdata.head.nstate), 
1404
+        m->fn);
1407 1405
       m = mrecv_mfdel(m);
1408 1406
       continue;
1409 1407
     }
... ...
@@ -1440,12 +1347,6 @@ int mrecv(int s)
1440 1440
   if(mrecv_packet(s, &data, &addr) == -1){
1441 1441
     return(0);
1442 1442
   }
1443
-  lprintf(9, "%s: %s %s rid=%d seq=%d\n", 
1444
-    __func__, 
1445
-    stropcode(&data), 
1446
-    strmstate(&data),
1447
-    data.head.reqid, 
1448
-    data.head.seqno); 
1449 1443
   if(data.head.flags & MAKUO_FLAG_ACK){
1450 1444
     mrecv_ack(&data, &addr);
1451 1445
   }else{
... ...
@@ -72,52 +72,43 @@ static int msend_packet(int s, mdata *data, struct sockaddr_in *addr)
72 72
   senddata.head.error  = htonl(senddata.head.error);
73 73
   szdata += sizeof(mhead);
74 74
  
75
-  while(1){ 
75
+  while(loop_flag){ 
76 76
     FD_ZERO(&fds);
77 77
     FD_SET(moption.mcsocket, &fds);
78 78
     tv.tv_sec  = 1;
79 79
     tv.tv_usec = 0;
80 80
     if(select(1024, NULL, &fds, NULL, &tv) != 1){
81
-      if(!loop_flag){
82
-        return(-1);
83
-      }
84 81
       continue;
85 82
     }
86 83
     r = sendto(s, &senddata, szdata, 0, (struct sockaddr*)addr, sizeof(struct sockaddr_in));
87 84
     if(r == szdata){
88
-      return(1);
85
+      return(0);
89 86
     }
90 87
     if(r == -1){
91
-      if(errno == EAGAIN){
92
-        lprintf(0,"%s: EAGAIN\n", __func__);
93
-        return(-1);
94
-      }
95 88
       if(errno == EINTR){
96 89
         continue;
97
-      }else{
98
-        break;
99 90
       }
91
+      lprintf(0,"%s: send error (%s) %s %s rid=%d size=%d seqno=%d\n",
92
+        __func__,
93
+        strerror(errno), 
94
+        stropcode(data), 
95
+        strmstate(data),
96
+        data->head.reqid, 
97
+        szdata, 
98
+        data->head.seqno);
99
+      break;
100
+    }else{
101
+      lprintf(0, "%s: send size error %s %s rid=%d datasize=%d sendsize=%d seqno=%d\n",
102
+        __func__,
103
+        stropcode(data), 
104
+        strmstate(data),
105
+        data->head.reqid,
106
+        szdata, 
107
+        r, 
108
+        data->head.seqno);
109
+      break;
100 110
     }
101 111
   }
102
-  if(r != -1){
103
-    lprintf(0, "%s: send size error %s %s rid=%d datasize=%d sendsize=%d seqno=%d\n",
104
-      __func__,
105
-      stropcode(data), 
106
-      strmstate(data),
107
-      data->head.reqid,
108
-      szdata, 
109
-      r, 
110
-      data->head.seqno);
111
-    return(0);
112
-  }
113
-  lprintf(0,"%s: send error (%s) %s %s rid=%d size=%d seqno=%d\n",
114
-    __func__,
115
-    strerror(errno), 
116
-    stropcode(data), 
117
-    strmstate(data),
118
-    data->head.reqid, 
119
-    szdata, 
120
-    data->head.seqno);
121 112
   return(-1);
122 113
 }
123 114
 
... ...
@@ -180,7 +171,7 @@ static int msend_retry(mfile *m)
180 180
 /* send & free */
181 181
 static void msend_shot(int s, mfile *m)
182 182
 {
183
-  if(msend_packet(s, &(m->mdata), &(m->addr)) == 1){
183
+  if(!msend_packet(s, &(m->mdata), &(m->addr))){
184 184
     msend_mfdel(m);
185 185
   }
186 186
 }
... ...
@@ -254,7 +245,6 @@ static void msend_req_send_break_init(int s, mfile *m)
254 254
 
255 255
 static void msend_req_send_break(int s, mfile *m)
256 256
 {
257
-  lprintf(9, "%s: BREAK %s\n", __func__, m->fn);
258 257
   if(m->initstate){
259 258
     msend_req_send_break_init(s, m);
260 259
     return;
... ...
@@ -392,7 +382,6 @@ static void msend_req_send_stat(int s, mfile *m)
392 392
     msend_packet(s, &(m->mdata), &(m->addr));
393 393
     return;
394 394
   }
395
-  lprintf(9,"%s: STAT %s\n", __func__, m->fn);
396 395
   if(m->mdata.head.flags & MAKUO_FLAG_SYNC){
397 396
     msend_req_send_stat_delete_report(m);
398 397
     m->initstate = 1;
... ...
@@ -440,7 +429,6 @@ static void msend_req_send_open_init(int s, mfile *m)
440 440
 
441 441
 static void msend_req_send_open(int s, mfile *m)
442 442
 {
443
-  lprintf(9,"%s: %s\n", __func__, m->fn);
444 443
   if(m->initstate){
445 444
     msend_req_send_open_init(s, m);
446 445
     return;
... ...
@@ -449,7 +437,6 @@ static void msend_req_send_open(int s, mfile *m)
449 449
     msend_packet(s, &(m->mdata), &(m->addr));
450 450
     return;
451 451
   }
452
-  lprintf(9, "%s: OPEN %s\n", __func__, m->fn);
453 452
   if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
454 453
     m->sendwait = 1;
455 454
     ack_clear(m, MAKUO_RECVSTATE_UPDATE);
... ...
@@ -480,7 +467,6 @@ static void msend_req_send_markdata(int s, mfile *m)
480 480
   m->mdata.head.seqno = seq_getmark(m);
481 481
   offset = m->mdata.head.seqno;
482 482
   offset *= MAKUO_BUFFER_SIZE;
483
-  lprintf(9, "%s: block send retry seqno=%u count=%u\n", __func__, m->mdata.head.seqno, m->markcount);
484 483
   lseek(m->fd, offset, SEEK_SET);
485 484
   r = read(m->fd, m->mdata.data, MAKUO_BUFFER_SIZE);
486 485
   if(r>0){
... ...
@@ -569,7 +555,6 @@ static void msend_req_send_mark(int s, mfile *m)
569 569
     msend_req_send_mark_init(s, m);
570 570
     return;
571 571
   }
572
-  lprintf(9, "%s: MARK mark=%d %s\n", __func__, m->markcount, m->fn);
573 572
   m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
574 573
 }
575 574
 
... ...
@@ -681,7 +666,6 @@ static void msend_req_md5_open_init(int s, mfile *m)
681 681
 static void msend_req_md5_open(int s, mfile *m)
682 682
 {
683 683
   if(m->initstate){
684
-    lprintf(9,"%s: %s\n", __func__, m->fn);
685 684
     msend_req_md5_open_init(s, m);
686 685
     return;
687 686
   }
... ...
@@ -713,7 +697,6 @@ static void msend_req_md5_close(int s, mfile *m)
713 713
     msend_packet(s, &(m->mdata), &(m->addr));
714 714
     return;
715 715
   }
716
-  lprintf(9,"%s: %s\n", __func__, m->fn);
717 716
   msend_mfdel(m);
718 717
 }
719 718
 
... ...
@@ -732,7 +715,6 @@ static void msend_req_md5(int s, mfile *m)
732 732
 
733 733
 static void msend_req_dsync_open(int s, mfile *m)
734 734
 {
735
-  lprintf(9, "%s: init=%d wait=%d\n", __func__, m->initstate, m->sendwait);
736 735
   if(m->initstate){
737 736
     m->initstate = 0;
738 737
     m->sendwait  = 1;
... ...
@@ -769,19 +751,16 @@ static void msend_req_dsync_data_init(int s, mfile *m)
769 769
     }
770 770
     data_safeset16(&(m->mdata), len);
771 771
     data_safeset(&(m->mdata), e->pattern, len);
772
-    lprintf(9, "%s: exclude add %s\n", __func__, e->pattern);
773 772
     m->mdata.head.seqno++;
774 773
     e = e->next;
775 774
   }
776 775
   if(m->mdata.head.szdata == 0){
777 776
     m->mdata.head.seqno++;
778 777
   }
779
-  lprintf(9,"%s: rid=%d seqno=%d size=%d\n", __func__, m->mdata.head.reqid, m->mdata.head.seqno, m->mdata.head.szdata);
780 778
 }
781 779
 
782 780
 static void msend_req_dsync_data(int s, mfile *m)
783 781
 {
784
-  lprintf(9, "%s: init=%d wait=%d\n", __func__, m->initstate, m->sendwait);
785 782
   if(m->initstate){
786 783
     msend_req_dsync_data_init(s, m);
787 784
     msend_packet(s, &(m->mdata), &(m->addr));
... ...
@@ -799,7 +778,6 @@ static void msend_req_dsync_data(int s, mfile *m)
799 799
 
800 800
 static void msend_req_dsync_close(int s, mfile *m)
801 801
 {
802
-  lprintf(9, "%s: init=%d wait=%d\n", __func__, m->initstate, m->sendwait);
803 802
   if(m->initstate){
804 803
     m->sendwait  = 1;
805 804
     m->initstate = 0;
... ...
@@ -814,7 +792,6 @@ static void msend_req_dsync_close(int s, mfile *m)
814 814
 
815 815
 static void msend_req_dsync_break(int s, mfile *m)
816 816
 {
817
-  lprintf(9, "%s: init=%d wait=%d\n", __func__, m->initstate, m->sendwait);
818 817
   if(m->initstate){
819 818
     m->initstate = 0;
820 819
     m->sendwait  = 1;
... ...
@@ -832,7 +809,6 @@ static void msend_req_dsync_break(int s, mfile *m)
832 832
 /*----- dsync -----*/
833 833
 static void msend_req_dsync(int s, mfile *m)
834 834
 {
835
-  lprintf(9, "%s: rid=%d %s %s\n", __func__, m->mdata.head.reqid, stropcode(&(m->mdata)), strmstate(&(m->mdata)));
836 835
   if(m->mdata.head.nstate != MAKUO_SENDSTATE_LAST){
837 836
     if(!m->comm){
838 837
       if(m->mdata.head.nstate != MAKUO_SENDSTATE_BREAK){
... ...
@@ -862,7 +838,6 @@ static void msend_req_dsync(int s, mfile *m)
862 862
 
863 863
 static void msend_req_del_mark(int s, mfile *m)
864 864
 {
865
-  lprintf(9, "%s:\n", __func__);
866 865
   mfile *d = m->link; /* dsync object */
867 866
   if(m->initstate){
868 867
     m->initstate = 0;
... ...
@@ -916,7 +891,6 @@ static void msend_req_del_stat(int s, mfile *m)
916 916
     return;
917 917
   }
918 918
 
919
-  lprintf(9, "%s:\n", __func__);
920 919
   d = mkreq(&(m->mdata), &(m->addr), MAKUO_SENDSTATE_OPEN);
921 920
   d->mdata.head.flags = m->mdata.head.flags;
922 921
   d->mdata.head.reqid = getrid();
... ...
@@ -931,7 +905,6 @@ static void msend_req_del_stat(int s, mfile *m)
931 931
   if(len){
932 932
     data_safeset16(&(d->mdata), len);
933 933
     data_safeset(&(d->mdata), path, len);
934
-    lprintf(9, "%s: rid=%d %s\n", __func__, d->mdata.head.reqid, path + sizeof(uint32_t));
935 934
   }
936 935
 
937 936
   while(1){
... ...
@@ -965,19 +938,16 @@ static void msend_req_del_stat(int s, mfile *m)
965 965
     }
966 966
     data_safeset16(&(d->mdata), len);
967 967
     data_safeset(&(d->mdata), path, len);
968
-    lprintf(9, "%s: rid=%d %s\n", __func__, d->mdata.head.reqid, path + sizeof(uint32_t));
969 968
   }
970 969
 }
971 970
 
972 971
 static void msend_req_del_last(int s, mfile *m)
973 972
 {
974
-  lprintf(9, "%s:\n", __func__);
975 973
   msend_mfdel(m);
976 974
 }
977 975
 
978 976
 static void msend_req_del_break(int s, mfile *m)
979 977
 {
980
-  lprintf(9, "%s:\n", __func__);
981 978
   if(m->link){
982 979
     m->link->lastrecv.tv_sec = 1;
983 980
   }
... ...
@@ -986,7 +956,6 @@ static void msend_req_del_break(int s, mfile *m)
986 986
 
987 987
 static void msend_req_del_open(int s, mfile *m)
988 988
 {
989
-  lprintf(9, "%s:\n", __func__);
990 989
   if(m->initstate){
991 990
     m->initstate = 0;
992 991
     m->sendwait  = 1;
... ...
@@ -1003,7 +972,6 @@ static void msend_req_del_open(int s, mfile *m)
1003 1003
 
1004 1004
 static void msend_req_del_data(int s, mfile *m)
1005 1005
 {
1006
-  lprintf(9, "%s:\n", __func__);
1007 1006
   if(m->initstate){
1008 1007
     m->initstate = 0;
1009 1008
     m->sendwait  = 1;
... ...
@@ -1022,7 +990,6 @@ static void msend_req_del_data(int s, mfile *m)
1022 1022
 
1023 1023
 static void msend_req_del_close(int s, mfile *m)
1024 1024
 {
1025
-  lprintf(9, "%s:\n", __func__);
1026 1025
   if(m->initstate){
1027 1026
     m->initstate = 0;
1028 1027
     m->sendwait  = 1;
... ...
@@ -1081,13 +1048,6 @@ static void msend_req_ping(int s, mfile *m)
1081 1081
 /*----- send request -----*/
1082 1082
 static void msend_req(int s, mfile *m)
1083 1083
 {
1084
-  lprintf(9, "%s: rid=%d %s %s init=%d wait=%d\n",
1085
-    __func__, 
1086
-    m->mdata.head.reqid, 
1087
-    stropcode(&(m->mdata)), 
1088
-    strmstate(&(m->mdata)), 
1089
-    m->initstate, 
1090
-    m->sendwait);
1091 1084
   switch(m->mdata.head.opcode){
1092 1085
     case MAKUO_OP_PING:
1093 1086
       msend_req_ping(s, m);
... ...
@@ -1121,13 +1081,6 @@ void msend(int s, mfile *m)
1121 1121
   if(msend_retry(m)){
1122 1122
     return;
1123 1123
   }
1124
-  lprintf(9, "%s: %s %s %s rid=%d seq=%d\n", 
1125
-    __func__, 
1126
-    strackreq(&(m->mdata)),
1127
-    stropcode(&(m->mdata)), 
1128
-    strmstate(&(m->mdata)),
1129
-    m->mdata.head.reqid, 
1130
-    m->mdata.head.seqno); 
1131 1124
   mtimeget(&m->lastsend);
1132 1125
   if(m->mdata.head.flags & MAKUO_FLAG_ACK){
1133 1126
     msend_ack(s, m); /* source node task */