Masanobu Yasui authored on 2009/05/18 02:33:24
Showing 7 changed files
... ...
@@ -1,6 +1,8 @@
1 1
 1.1.4:
2 2
  - log format change. add [error] for error log
3 3
  - latency improvement.
4
+ - fix: terminate process when file close error.
5
+ -
4 6
 
5 7
 1.1.3:
6 8
  - sort msync --members by hostname
... ...
@@ -792,8 +792,9 @@ int linkcmp(mfile *m)
792 792
 
793 793
 int statcmp(struct stat *s1, struct stat *s2)
794 794
 {
795
-  if(s1->st_mtime != s2->st_mtime)
795
+  if(s1->st_mtime != s2->st_mtime){
796 796
     return(MAKUO_RECVSTATE_UPDATE);
797
+  }
797 798
   if(!geteuid() || !getegid()){
798 799
     if(s1->st_uid != s2->st_uid){
799 800
       return(MAKUO_RECVSTATE_UPDATE);
... ...
@@ -16,7 +16,7 @@ void recv_timeout(mfile *m)
16 16
     for(t=members;t;t=t->next){
17 17
       r = get_hoststate(t, m);
18 18
       if(*r == MAKUO_RECVSTATE_NONE){
19
-        lprintf(0, "%s: %s(%s)\n", __func__, inet_ntoa(t->ad), t->hostname);
19
+        lprintf(0, "[error] %s: %s(%s)\n", __func__, inet_ntoa(t->ad), t->hostname);
20 20
         member_del_message(t, "receive time out");
21 21
         member_del(t);
22 22
         break;
... ...
@@ -92,19 +92,17 @@ int mcomm_accept(mcomm *c, fd_set *fds, int s)
92 92
 }
93 93
 
94 94
 void mcomm_check(mcomm *c){
95
-  int i, j;
95
+  int    i;
96 96
   mfile *m;
97 97
   for(i=0;i<MAX_COMM;i++){
98
-    if(c[i].fd[1] == -1){
98
+    if(c[i].working && !c[i].cpid && (c[i].fd[1] == -1)){
99 99
       for(m=mftop[0];m;m=m->next){
100 100
         if(m->comm == &c[i]){
101
-          break;
101
+          break; /* working */
102 102
         }
103 103
       }
104 104
       if(!m){
105
-        if(c[i].working && !c[i].cpid){
106
-          workend(&c[i]);
107
-        }
105
+        workend(&c[i]);
108 106
       }
109 107
     }
110 108
   }
... ...
@@ -53,7 +53,7 @@
53 53
 #define MAKUO_MCAST_PORT  5000
54 54
 
55 55
 /*----- timeout -----*/
56
-#define MAKUO_SEND_TIMEOUT  501    /* 再送間隔(ms)                                 */
56
+#define MAKUO_SEND_TIMEOUT  500    /* 再送間隔(ms)                                 */
57 57
 #define MAKUO_SEND_RETRYCNT 120    /* 再送回数                                     */
58 58
 #define MAKUO_PONG_TIMEOUT  300000 /* メンバから除外するまでの時間(ms)             */
59 59
 #define MAKUO_PONG_INTERVAL 45000  /* PONG送信間隔(ms)                             */
... ...
@@ -547,10 +547,11 @@ static void mrecv_req_send_mark(mfile *m, mdata *r)
547 547
   m->lickflag = 1;
548 548
   a = mkack(&(m->mdata),&(m->addr),MAKUO_RECVSTATE_MARK);
549 549
   if(!a){
550
-    lprintf(0, "%s: out of momory\n", __func__);
550
+    lprintf(0, "[error] %s: out of momory\n", __func__);
551 551
     return;
552 552
   }
553 553
   if(a->mdata.head.szdata){
554
+    msend(a);
554 555
     return;
555 556
   }
556 557
   for(mm=m->mark;mm;mm=mm->next){
... ...
@@ -576,29 +577,33 @@ static void mrecv_req_send_data_write(mfile *m, mdata *r)
576 576
   if(r->head.szdata == 0){
577 577
     return;
578 578
   }
579
-  offset = r->head.seqno;
579
+  offset  = r->head.seqno;
580 580
   offset *= MAKUO_BUFFER_SIZE;
581 581
   if(lseek(m->fd, offset, SEEK_SET) == -1){
582
-    lprintf(0, "%s: seek error (%s) seq=%u\n",
583
-      __func__, strerror(errno), (int)(r->head.seqno));
582
+    m->mdata.head.error  = errno;
584 583
     m->mdata.head.ostate = m->mdata.head.nstate;
585 584
     m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR;
586 585
     mrecv_req_send_data_write_error(m, r);
587
-    return; /* seek error */
586
+    lprintf(0, "[error] %s: seek error (%s) seq=%u\n",
587
+      __func__,
588
+      strerror(m->mdata.head.error), 
589
+      (int)(r->head.seqno));
590
+    return; /* lseek error */
588 591
   }
589
-  if(write(m->fd, r->data, r->head.szdata) != -1){
590
-    m->recvcount++;
591
-  }else{
592
-    lprintf(0, "%s: write error (%s) seqno=%d size=%d %s\n",
592
+  if(write(m->fd, r->data, r->head.szdata) == -1){
593
+    m->mdata.head.error  = errno;
594
+    m->mdata.head.ostate = m->mdata.head.nstate;
595
+    m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR;
596
+    mrecv_req_send_data_write_error(m, r);
597
+    lprintf(0, "[error] %s: write error (%s) seqno=%d size=%d %s\n",
593 598
       __func__,
594
-      strerror(errno), 
599
+      strerror(m->mdata.head.error), 
595 600
       (int)(r->head.seqno), 
596 601
       r->head.szdata,
597 602
       m->fn);
598
-    m->mdata.head.ostate = m->mdata.head.nstate;
599
-    m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR;
600
-    mrecv_req_send_data_write_error(m, r);
603
+    return; /* write error */
601 604
   }
605
+  m->recvcount++;
602 606
 }
603 607
 
604 608
 static void mrecv_req_send_data_retry(mfile *m, mdata *r)
... ...
@@ -664,14 +669,16 @@ static void mrecv_req_send_close(mfile *m, mdata *r)
664 664
     return;
665 665
   }
666 666
 
667
+  m->mdata.head.ostate = m->mdata.head.nstate;
667 668
   if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){
669
+    m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
670
+    msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
668 671
     return;
669 672
   }
670
- 
673
+
671 674
   mftime.actime  = m->fs.st_ctime; 
672 675
   mftime.modtime = m->fs.st_mtime;
673
-  m->mdata.head.ostate = m->mdata.head.nstate;
674
-
676
+ 
675 677
   if(S_ISLNK(m->fs.st_mode)){
676 678
     if(!mrename(moption.base_dir, m->tn, m->fn)){
677 679
       m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
... ...
@@ -863,7 +870,6 @@ static mfile *mrecv_req_send_create(mdata *data, struct sockaddr_in *addr)
863 863
 static void mrecv_req_send(mdata *data, struct sockaddr_in *addr)
864 864
 {
865 865
   mfile *m;
866
-
867 866
   if(m = mrecv_req_send_create(data, addr)){
868 867
     mtimeget(&(m->lastrecv));
869 868
     mrecv_req_send_next(m, data);
... ...
@@ -1117,7 +1123,7 @@ static void mrecv_req_dsync_data(mfile *m, mdata *data, struct sockaddr_in *addr
1117 1117
   pipe(p);
1118 1118
   pid = fork();
1119 1119
   if(pid == -1){
1120
-    lprintf(0, "%s: fork error (%s)\n", __func__, strerror(errno));
1120
+    lprintf(0, "%s: %s fork error\n", __func__, strerror(errno));
1121 1121
     close(p[0]);
1122 1122
     close(p[1]);
1123 1123
     return;
... ...
@@ -129,6 +129,7 @@ static int msend_packet(int s, mdata *data, struct sockaddr_in *addr)
129 129
 /* retry */
130 130
 static int msend_retry(mfile *m)
131 131
 {
132
+  uint32_t w;
132 133
   uint8_t *r;
133 134
   mhost   *t;
134 135
 
... ...
@@ -141,7 +142,13 @@ static int msend_retry(mfile *m)
141 141
   }
142 142
   if(m->mdata.head.opcode == MAKUO_OP_DSYNC){
143 143
     if(m->mdata.head.nstate == MAKUO_SENDSTATE_CLOSE){
144
-      return(0);
144
+      m->retrycnt--;
145
+      w = (MAKUO_SEND_RETRYCNT - m->retrycnt) * MAKUO_SEND_TIMEOUT;
146
+      if(w < 15000){
147
+        return(0);
148
+      }else{
149
+        m->retrycnt = MAKUO_SEND_RETRYCNT;
150
+      }
145 151
     }
146 152
   }
147 153
   lprintf(2, "%s: send retry count=%02d rid=%06d op=%s state=%s %s\n", 
... ...
@@ -1112,7 +1119,6 @@ void msend(mfile *m)
1112 1112
     msend_req(moption.mcsocket, m); 
1113 1113
   }
1114 1114
 }
1115
-
1116 1115
 void msend_clean()
1117 1116
 {
1118 1117
   mfile *m = mftop[0];
... ...
@@ -1,10 +1,10 @@
1 1
 Name:           makuosan
2
-Version:        1.0.0
2
+Version:        1.1.4
3 3
 Release:        1%{?dist}
4 4
 Summary:        Multicasts All-Kinds of Updating Operation for Servers on Administered Network
5 5
 
6 6
 Group:          System Environment/Daemons
7
-License:        BSD
7
+License:        GPL
8 8
 URL:            http://lab.klab.org/wiki/Makuosan
9 9
 Source0:        http://downloads.sourceforge.net/makuosan/%{name}-%{version}.tar.gz
10 10
 BuildRoot:      %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
... ...
@@ -83,6 +83,7 @@ rm -rf %{buildroot}
83 83
 
84 84
 
85 85
 %changelog
86
+* Sat May 16 2009 Masanobu Yasui <yasui0906@gmail.com> - 1.1.4
87
+
86 88
 * Thu Nov  6 2008 Naoya Nakazawa <naoya.n@gmail.com> - 1.0.0
87 89
 - Initial version
88
-