Masanobu Yasui authored on 2009/05/22 06:00:04
Showing 6 changed files
... ...
@@ -284,7 +284,7 @@ void member_del(mhost *t)
284 284
   free(t);
285 285
 }
286 286
 
287
-void member_del_message(mhost *t, char *mess)
287
+void member_del_message(int err, mhost *t, char *mess)
288 288
 {
289 289
   int i;
290 290
   mfile *m;
... ...
@@ -297,12 +297,14 @@ void member_del_message(mhost *t, char *mess)
297 297
       if(m->comm){
298 298
         if(m->comm->working){
299 299
           cprintf(0, m->comm, "error: %s: %s\n", mess, m->cmdline);
300
-          lprintf(0, "[error] %s: %s: ST=%s RC=%02d: %s\n", 
301
-            __func__, 
302
-            mess, 
303
-            strmstate(&(m->mdata)),
304
-            m->retrycnt,
305
-            m->cmdline);
300
+          if(err){
301
+            lprintf(0, "[error] %s: %s: ST=%s RC=%02d: %s\n", 
302
+              __func__, 
303
+              mess, 
304
+              strmstate(&(m->mdata)),
305
+              m->retrycnt,
306
+              m->cmdline);
307
+          }
306 308
         }
307 309
       }
308 310
     }
... ...
@@ -312,7 +314,9 @@ void member_del_message(mhost *t, char *mess)
312 312
       cprintf(0, &(moption.comm[i]), "error: %s: %s(%s)\n", mess, inet_ntoa(t->ad), t->hostname);
313 313
     }
314 314
   }
315
-  lprintf(0, "[error] %s: %s: %s(%s)\n", __func__, mess, inet_ntoa(t->ad), t->hostname);
315
+  if(err){
316
+    lprintf(0, "[error] %s: %s: %s(%s)\n", __func__, mess, inet_ntoa(t->ad), t->hostname);
317
+  }
316 318
 }
317 319
 
318 320
 mmark *markalloc()
... ...
@@ -934,24 +938,44 @@ mfile *mkack(mdata *data, struct sockaddr_in *addr, uint8_t state)
934 934
   return(a);
935 935
 }
936 936
 
937
-int atomic_read(int fd, char *buff, int size)
937
+int atomic_read(int fd, void *buff, int size, int nb)
938 938
 {
939
+  int e;
939 940
   int r;
941
+  int s;
942
+  int f;
943
+
944
+  s = size;
945
+  f = fcntl(fd, F_GETFL, 0);
946
+  if(nb){
947
+    fcntl(fd, F_SETFL, f | O_NONBLOCK);
948
+  }else{
949
+    fcntl(fd, F_SETFL, f & ~O_NONBLOCK);
950
+  }
940 951
 
941 952
   while(size){
942 953
     r = read(fd, buff, size);
954
+    e = errno;
955
+    /* EOF */
956
+    if(r == 0){
957
+      fcntl(fd, F_SETFL, f);
958
+      return(1);
959
+    }
960
+    /* ERROR */
943 961
     if(r == -1){
944
-      if(errno == EINTR){
962
+      if(e == EINTR){
945 963
         continue;
946 964
       }
965
+      fcntl(fd, F_SETFL, f);
966
+      errno = e;
947 967
       return(-1);
948 968
     }
949
-    if(r == 0){
950
-      return(1);
951
-    }
969
+    /* SUCCESS */
952 970
     size -= r;
953 971
     buff += r;
972
+    fcntl(fd, F_SETFL, f & ~O_NONBLOCK);
954 973
   }
974
+  fcntl(fd, F_SETFL, f);
955 975
   return(0);
956 976
 }
957 977
 
... ...
@@ -15,7 +15,7 @@ void recv_timeout(mfile *m)
15 15
     for(t=members;t;t=t->next){
16 16
       r = get_hoststate(t, m);
17 17
       if(*r == MAKUO_RECVSTATE_NONE){
18
-        member_del_message(t, "receive time out");
18
+        member_del_message(1, t, "receive time out");
19 19
         member_del(t);
20 20
         break;
21 21
       }
... ...
@@ -27,7 +27,7 @@ void recv_timeout(mfile *m)
27 27
 struct timeval *pingpong(int n)
28 28
 {
29 29
   static struct timeval tv;
30
-  mfile *m = mfins(0);
30
+  mfile *m = mfins(MFSEND);
31 31
   mping *p = NULL;
32 32
   char buff[MAKUO_HOSTNAME_MAX + 1];
33 33
 
... ...
@@ -186,6 +186,16 @@ int ismsend(mfile *m, int flag)
186 186
   if(!m){
187 187
     return(0);
188 188
   }
189
+  if(m->mdata.head.nstate == MAKUO_SENDSTATE_WAIT){
190
+    return(0);
191
+  }
192
+  if(m->mdata.head.flags & MAKUO_FLAG_ACK){
193
+    if(flag){
194
+      msend(m);
195
+    }
196
+    return(0);
197
+  }
198
+
189 199
   if(!S_ISLNK(m->fs.st_mode) && S_ISDIR(m->fs.st_mode)){
190 200
     if(!mfdirchk(m)){
191 201
       return(0);
... ...
@@ -238,7 +248,7 @@ void mloop()
238 238
 
239 239
     /* udp packet send */
240 240
     para = 0;
241
-    m = mftop[0];
241
+    m = mftop[MFSEND];
242 242
     while(m){
243 243
       n = m->next;
244 244
       para += ismsend(m, 1);
... ...
@@ -256,7 +266,7 @@ void mloop()
256 256
     FD_ZERO(&wfds);
257 257
     FD_SET(moption.mcsocket,  &rfds);
258 258
     mcomm_fdset(moption.comm, &rfds);
259
-    for(m=mftop[0];m;m=m->next){
259
+    for(m=mftop[MFSEND];m;m=m->next){
260 260
       if(ismsend(m, 0)){
261 261
         FD_SET(moption.mcsocket, &wfds);
262 262
         break;
... ...
@@ -265,17 +275,17 @@ void mloop()
265 265
     tv.tv_sec  = 1;
266 266
     tv.tv_usec = 0;
267 267
     if(select(1024, &rfds, &wfds, NULL, &tv) == -1){
268
+      mrecv_gc();
268 269
       continue;
269 270
     }
270
-    moption.sendready = FD_ISSET(moption.mcsocket,&rfds);
271
-    mcomm_accept(moption.comm, &rfds, moption.lisocket); /* new console  */
272
-    mcomm_read(moption.comm, &rfds);                     /* command exec */
273
-    mrecv_gc();
271
+    moption.sendready = FD_ISSET(moption.mcsocket,&wfds);
272
+    mcomm_accept(moption.comm, &rfds, moption.lisocket);  /* new console  */
273
+    mcomm_read(moption.comm, &rfds);                      /* command exec */
274 274
   }
275 275
 
276 276
   /* shutdown notify */
277 277
   pingpong(2);
278
-  msend(mftop[0]);
278
+  msend(mftop[MFSEND]);
279 279
 }
280 280
 
281 281
 int main(int argc, char *argv[])
... ...
@@ -88,6 +88,7 @@
88 88
 #define MAKUO_SENDSTATE_LAST       5  /* 送信完了   */
89 89
 #define MAKUO_SENDSTATE_ERROR      6  /* エラー発生 */
90 90
 #define MAKUO_SENDSTATE_BREAK      7  /* 送信中断   */
91
+#define MAKUO_SENDSTATE_WAIT       8  /* 送信待機   */
91 92
 
92 93
 /*----- recvstatus -----*/
93 94
 #define MAKUO_RECVSTATE_NONE       0
... ...
@@ -296,7 +297,7 @@ char *strrstate(uint8_t n);
296 296
 char *strmstate(mdata *data);
297 297
 char *stropcode(mdata *data);
298 298
 char *strackreq(mdata *data);
299
-void mprintf(const char *func, mfile *m);
299
+void mprintf(int l, const char *func, mfile *m);
300 300
 void lprintf(int l, char *fmt, ...);
301 301
 void cprintf(int l, mcomm *c, char *fmt, ...);
302 302
 void fdprintf(int s, char *fmt, ...);
... ...
@@ -329,6 +330,7 @@ int mremove(char *base, char *name);
329 329
 int mcreatedir(char  *base, char *name, mode_t mode);
330 330
 int mcreatefile(char *base, char *name, mode_t mode);
331 331
 int mcreatelink(char *base, char *name, char *link);
332
+int atomic_read(int fd, void *buff, int size, int nb);
332 333
 void set_filestat(char *path, uid_t uid, gid_t gid, mode_t mode);
333 334
 
334 335
 /*----- uid/gid -----*/
... ...
@@ -339,7 +341,7 @@ int set_gids(char *groups);
339 339
 void   member_del(mhost *h);
340 340
 mhost *member_get(struct in_addr *addr);
341 341
 mhost *member_add(struct in_addr *addr, mdata *recvdata);
342
-void   member_del_message(mhost *t, char *mess);
342
+void   member_del_message(int err, mhost *t, char *mess);
343 343
 
344 344
 /*----- mark operation -----*/
345 345
 mmark   *delmark(mmark *mm);
... ...
@@ -379,7 +379,7 @@ static void mrecv_ack(mdata *data, struct sockaddr_in *addr)
379 379
 *******************************************************************/
380 380
 static mfile *mrecv_req_search(mdata *data, struct sockaddr_in *addr)
381 381
 {
382
-  mfile *m = mftop[1];
382
+  mfile *m = mftop[MFRECV];
383 383
   while(m){
384 384
     if(!memcmp(&m->addr, addr, sizeof(m->addr)) && m->mdata.head.reqid == data->head.reqid){
385 385
       break;
... ...
@@ -410,6 +410,7 @@ static void mrecv_req_ping(mdata *data, struct sockaddr_in *addr)
410 410
   a->mdata.p += p->versionlen;
411 411
   p->hostnamelen = htons(p->hostnamelen);
412 412
   p->versionlen  = htons(p->versionlen);
413
+  msend(a);
413 414
 }
414 415
 
415 416
 static void mrecv_req_exit(mdata *data, struct sockaddr_in *addr)
... ...
@@ -418,7 +419,7 @@ static void mrecv_req_exit(mdata *data, struct sockaddr_in *addr)
418 418
   if(!t){
419 419
     return;
420 420
   }
421
-  member_del_message(t, "member exit");
421
+  member_del_message(0, t, "member exit");
422 422
   member_del(t);
423 423
 }
424 424
 
... ...
@@ -575,11 +576,6 @@ static void mrecv_req_send_mark(mfile *m, mdata *r)
575 575
   msend(a);
576 576
 }
577 577
 
578
-static void mrecv_req_send_data_write_error(mfile *m, mdata *r)
579
-{
580
-  msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
581
-}
582
-
583 578
 static void mrecv_req_send_data_write(mfile *m, mdata *r)
584 579
 {
585 580
   off_t offset;
... ...
@@ -592,7 +588,7 @@ static void mrecv_req_send_data_write(mfile *m, mdata *r)
592 592
     m->mdata.head.error  = errno;
593 593
     m->mdata.head.ostate = m->mdata.head.nstate;
594 594
     m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR;
595
-    mrecv_req_send_data_write_error(m, r);
595
+    msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
596 596
     lprintf(0, "[error] %s: seek error (%s) seq=%u\n",
597 597
       __func__,
598 598
       strerror(m->mdata.head.error), 
... ...
@@ -603,7 +599,7 @@ static void mrecv_req_send_data_write(mfile *m, mdata *r)
603 603
     m->mdata.head.error  = errno;
604 604
     m->mdata.head.ostate = m->mdata.head.nstate;
605 605
     m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR;
606
-    mrecv_req_send_data_write_error(m, r);
606
+    msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
607 607
     lprintf(0, "[error] %s: write error (%s) seqno=%d size=%d %s\n",
608 608
       __func__,
609 609
       strerror(m->mdata.head.error), 
... ...
@@ -623,18 +619,17 @@ static void mrecv_req_send_data_retry(mfile *m, mdata *r)
623 623
     lprintf(0, "%s: out of momory\n", __func__);
624 624
     return;
625 625
   }
626
-  if(a->mdata.head.szdata){
627
-    return;
628
-  }
629
-  data_safeset32(&(a->mdata), m->mdata.head.seqno);
630
-  data_safeset32(&(a->mdata), r->head.seqno);
631
-  for(mm=m->mark;mm;mm=mm->next){
632
-    if(data_safeset32(&(a->mdata), mm->l)){
633
-      break;
634
-    }
635
-    if(data_safeset32(&(a->mdata), mm->h)){
636
-      a->mdata.head.szdata -= sizeof(uint32_t);
637
-      break;
626
+  if(a->mdata.head.szdata == 0){
627
+    data_safeset32(&(a->mdata), m->mdata.head.seqno);
628
+    data_safeset32(&(a->mdata), r->head.seqno);
629
+    for(mm=m->mark;mm;mm=mm->next){
630
+      if(data_safeset32(&(a->mdata), mm->l)){
631
+        break;
632
+      }
633
+      if(data_safeset32(&(a->mdata), mm->h)){
634
+        a->mdata.head.szdata -= sizeof(uint32_t);
635
+        break;
636
+      }
638 637
     }
639 638
   }
640 639
   msend(a);
... ...
@@ -884,7 +879,7 @@ static void mrecv_req_send(mdata *data, struct sockaddr_in *addr)
884 884
     mrecv_req_send_next(m, data);
885 885
   }else{
886 886
     if(data->head.nstate != MAKUO_SENDSTATE_DATA){
887
-      mkack(data, addr, MAKUO_RECVSTATE_IGNORE);
887
+      msend(mkack(data, addr, MAKUO_RECVSTATE_IGNORE));
888 888
     }
889 889
   }
890 890
 }
... ...
@@ -929,12 +924,12 @@ static void mrecv_req_md5_open(mfile *m, mdata *data, struct sockaddr_in *addr)
929 929
     }
930 930
   }
931 931
   mtimeget(&(m->lastrecv));
932
-  mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate);
932
+  msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate));
933 933
 }
934 934
 
935 935
 static void mrecv_req_md5_close(mfile *m, mdata *data, struct sockaddr_in *addr)
936 936
 {
937
-  mkack(data, addr, MAKUO_RECVSTATE_CLOSE);
937
+  msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE));
938 938
   mrecv_mfdel(m);
939 939
 }
940 940
 
... ...
@@ -1089,11 +1084,11 @@ static int dsync_scan(int fd, char *base, int recurs, excludeitem *e)
1089 1089
 
1090 1090
 static void mrecv_req_dsync_open(mfile *m, mdata *data, struct sockaddr_in *addr)
1091 1091
 {
1092
-  mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1092
+  msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN));
1093 1093
   if(m){
1094 1094
     return;
1095 1095
   }
1096
-  m = mfadd(1);
1096
+  m = mfadd(MFRECV);
1097 1097
   m->mdata.head.opcode = data->head.opcode;
1098 1098
   m->mdata.head.reqid  = data->head.reqid;
1099 1099
   m->mdata.head.flags  = data->head.flags;
... ...
@@ -1113,7 +1108,7 @@ static void mrecv_req_dsync_data(mfile *m, mdata *data, struct sockaddr_in *addr
1113 1113
   char path[PATH_MAX];
1114 1114
   uint16_t len;
1115 1115
 
1116
-  mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1116
+  msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN));
1117 1117
   if(m->mdata.head.seqno >= data->head.seqno){
1118 1118
     return;
1119 1119
   }
... ...
@@ -1178,13 +1173,13 @@ static void mrecv_req_dsync_data(mfile *m, mdata *data, struct sockaddr_in *addr
1178 1178
 static void mrecv_req_dsync_close(mfile *m, mdata *data, struct sockaddr_in *addr)
1179 1179
 {
1180 1180
   if(!m){
1181
-    mkack(data, addr, MAKUO_RECVSTATE_CLOSE);
1181
+    msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE));
1182 1182
     return;
1183 1183
   }
1184 1184
   if(m->link){
1185
-    mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1185
+    msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN));
1186 1186
   }else{
1187
-    mkack(data, addr, MAKUO_RECVSTATE_CLOSE);
1187
+    msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE));
1188 1188
   }
1189 1189
 }
1190 1190
 
... ...
@@ -1209,7 +1204,7 @@ static void mrecv_req_dsync_break(mfile *m, mdata *data, struct sockaddr_in *add
1209 1209
     }
1210 1210
     mrecv_mfdel(m);
1211 1211
   }
1212
-  mkack(data, addr, MAKUO_RECVSTATE_BREAK);
1212
+  msend(mkack(data, addr, MAKUO_RECVSTATE_BREAK));
1213 1213
 }
1214 1214
 
1215 1215
 /*
... ...
@@ -1264,6 +1259,7 @@ static void mrecv_req_del_open(mdata *data, struct sockaddr_in *addr)
1264 1264
       data_safeset(&(a->mdata), path, len);
1265 1265
     }
1266 1266
   }
1267
+  msend(a);
1267 1268
 }
1268 1269
 
1269 1270
 static void mrecv_req_del_data_report(mfile *m, mcomm *c, uint32_t err, char *hn, char *path)
... ...
@@ -1289,17 +1285,18 @@ static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr)
1289 1289
   char *hn = "unknown host";
1290 1290
   mhost *t = member_get(&(addr->sin_addr));
1291 1291
   mcomm *c = NULL;
1292
+  mfile *a = NULL;
1292 1293
   mfile *m = mrecv_req_search(data, addr);
1293
-  mfile *a = mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1294 1294
   char path[PATH_MAX];
1295 1295
 
1296
+  msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN));
1296 1297
   if(m){
1297 1298
     return;
1298 1299
   }
1299 1300
   if(t){
1300 1301
     hn = t->hostname;
1301 1302
   }
1302
-  m = mfadd(1);
1303
+  m = mfadd(MFRECV);
1303 1304
   m->mdata.head.opcode = data->head.opcode;
1304 1305
   m->mdata.head.reqid  = data->head.reqid;
1305 1306
   m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
... ...
@@ -1307,7 +1304,7 @@ static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr)
1307 1307
   if(data->head.flags & MAKUO_FLAG_DRYRUN){
1308 1308
     m->dryrun = 1;
1309 1309
   }
1310
-  for(a=mftop[0];a;a=a->next){
1310
+  for(a=mftop[MFSEND];a;a=a->next){
1311 1311
     if((a->mdata.head.reqid == data->head.seqno) && (a->comm != NULL)){
1312 1312
       c = a->comm;
1313 1313
       break;
... ...
@@ -1326,7 +1323,7 @@ static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr)
1326 1326
 static void mrecv_req_del_close(mdata *data, struct sockaddr_in *addr)
1327 1327
 {
1328 1328
   mfile *m = mrecv_req_search(data, addr);
1329
-  mfile *a = mkack(data, addr, MAKUO_RECVSTATE_CLOSE);
1329
+  msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE));
1330 1330
   mrecv_mfdel(m);
1331 1331
 }
1332 1332
 
... ...
@@ -1367,7 +1364,7 @@ static void mrecv_req(mdata *data, struct sockaddr_in *addr)
1367 1367
       mrecv_req_del(data,   addr);
1368 1368
       break;
1369 1369
     default:
1370
-      mkack(data, addr, MAKUO_RECVSTATE_IGNORE);
1370
+      msend(mkack(data, addr, MAKUO_RECVSTATE_IGNORE));
1371 1371
       break;
1372 1372
   }
1373 1373
 }
... ...
@@ -1380,13 +1377,12 @@ static void mrecv_req(mdata *data, struct sockaddr_in *addr)
1380 1380
 void mrecv_gc()
1381 1381
 {
1382 1382
   mhost *t = members;
1383
-  mfile *m = mftop[1]; 
1383
+  mfile *m = mftop[MFRECV]; 
1384 1384
 
1385 1385
   /* file timeout */
1386 1386
   while(m){
1387 1387
     if(mtimeout(&(m->lastrecv), MAKUO_RECV_GCWAIT)){
1388
-      lprintf(0,"%s: mfile object GC state=%s %s\n",
1389
-        __func__, strrstate(m->mdata.head.nstate), m->fn);
1388
+      lprintf(0,"%s: mfile object GC state=%s %s\n", __func__, strrstate(m->mdata.head.nstate), m->fn);
1390 1389
       m = mrecv_mfdel(m);
1391 1390
       continue;
1392 1391
     }
... ...
@@ -1399,7 +1395,7 @@ void mrecv_gc()
1399 1399
       t = t->next;
1400 1400
     }else{
1401 1401
       lprintf(0,"%s: pong timeout %s\n", __func__, t->hostname);
1402
-      member_del_message(t, "pong time out");
1402
+      member_del_message(1, t, "pong time out");
1403 1403
       if(t->next){
1404 1404
         t = t->next;
1405 1405
         member_del(t->prev);
... ...
@@ -1413,7 +1409,7 @@ void mrecv_gc()
1413 1413
 
1414 1414
 void mrecv_clean()
1415 1415
 {
1416
-  mfile *m = mftop[1];
1416
+  mfile *m = mftop[MFRECV];
1417 1417
   while(m=mrecv_mfdel(m));
1418 1418
 }
1419 1419
 
... ...
@@ -1424,7 +1420,7 @@ int mrecv()
1424 1424
   mdata data;
1425 1425
   struct sockaddr_in addr;
1426 1426
 
1427
-  m = mftop[0];
1427
+  m = mftop[MFSEND];
1428 1428
   if(mrecv_packet(moption.mcsocket, &data, &addr) == -1){
1429 1429
     return(0);
1430 1430
   }
... ...
@@ -1436,6 +1432,6 @@ int mrecv()
1436 1436
   }else{
1437 1437
     mrecv_req(&data, &addr);
1438 1438
   }
1439
-  return(m == mftop[0]);
1439
+  return(m == mftop[MFSEND]);
1440 1440
 }
1441 1441
 
... ...
@@ -100,7 +100,7 @@ static int msend_packet(int s, mdata *data, struct sockaddr_in *addr)
100 100
     moption.sendready = 0;
101 101
     r = sendto(s, &senddata, szdata, 0, (struct sockaddr*)addr, sizeof(struct sockaddr_in));
102 102
     if(r == szdata){
103
-      return(0); // success
103
+      return(0); /* success */
104 104
     }
105 105
     if(r == -1){
106 106
       if(errno == EINTR){
... ...
@@ -144,51 +144,34 @@ static int msend_retry(mfile *m)
144 144
     m->retrycnt = MAKUO_SEND_RETRYCNT;
145 145
     return(0);
146 146
   }
147
-  /*
148
-  if(m->mdata.head.opcode == MAKUO_OP_DSYNC){
149
-    if(m->mdata.head.nstate == MAKUO_SENDSTATE_CLOSE){
150
-      m->retrycnt--;
151
-      w = (MAKUO_SEND_RETRYCNT - m->retrycnt) * MAKUO_SEND_TIMEOUT;
152
-      if(w < 15000){
153
-        return(0);
154
-      }else{
155
-        m->retrycnt = MAKUO_SEND_RETRYCNT;
156
-      }
157
-    }
158
-  }
159
-  */
160
-  lprintf(2, "%s: send retry count=%02d rid=%06d op=%s state=%s %s\n", 
161
-    __func__,
162
-    m->retrycnt, 
163
-    m->mdata.head.reqid, 
164
-    stropcode(&(m->mdata)),
165
-    strmstate(&(m->mdata)), 
166
-    m->fn);
167
-  for(t=members;t;t=t->next){
168
-    r = get_hoststate(t, m);
169
-    if(!r){
170
-      lprintf(0, "%s: can't alloc state area %s\n",
171
-        __func__, 
172
-        t->hostname);
173
-      continue;
174
-    }
175
-    switch(moption.loglevel){
176
-      case 2:
177
-        if(*r == MAKUO_RECVSTATE_NONE){
178
-          lprintf(0, "%s:   %s %s(%s)\n", 
179
-            __func__, 
180
-           strrstate(*r), 
181
-           inet_ntoa(t->ad), 
182
-           t->hostname);
183
-        }
184
-        break;
185
-      default:
186
-        lprintf(3, "%s:   %s %s(%s)\n", 
147
+  if(moption.loglevel > 1){
148
+    mprintf(2, __func__, m); 
149
+    for(t=members;t;t=t->next){
150
+      r = get_hoststate(t, m);
151
+      if(!r){
152
+        lprintf(0, "%s: can't alloc state area %s\n",
187 153
           __func__, 
188
-          strrstate(*r), 
189
-          inet_ntoa(t->ad), 
190 154
           t->hostname);
191
-        break;
155
+        continue;
156
+      }
157
+      switch(moption.loglevel){
158
+        case 2:
159
+          if(*r == MAKUO_RECVSTATE_NONE){
160
+            lprintf(0, "%s:   %s %s(%s)\n", 
161
+              __func__, 
162
+             strrstate(*r), 
163
+             inet_ntoa(t->ad), 
164
+             t->hostname);
165
+          }
166
+          break;
167
+        default:
168
+          lprintf(3, "%s:   %s %s(%s)\n", 
169
+            __func__, 
170
+            strrstate(*r), 
171
+            inet_ntoa(t->ad), 
172
+            t->hostname);
173
+          break;
174
+      }
192 175
     }
193 176
   }
194 177
   m->retrycnt--;
... ...
@@ -905,19 +888,29 @@ static int msend_req_del_stat_read_pathcmp(int s, mfile *m)
905 905
 
906 906
 static void msend_req_del_stat_read(int s, mfile *m)
907 907
 {
908
+  int r;
908 909
   mfile *d;
909 910
   uint16_t len;
910 911
 
911
-  d = mkreq(&(m->mdata), &(m->addr), MAKUO_SENDSTATE_OPEN);
912
-  d->mdata.head.flags = m->mdata.head.flags;
913
-  d->mdata.head.reqid = getrid();
914
-  d->initstate = 1;
915
-  d->sendwait  = 0;
916
-  d->sendto    = 1;
917
-  d->dryrun    = m->dryrun;
918
-  d->recurs    = m->recurs;
919
-  d->link      = m;
920
-  d->mdata.p   = d->mdata.data;
912
+  for(d=mftop[MFSEND];d;d=d->next){
913
+    if(d->link == m){
914
+      if(d->mdata.head.nstate == MAKUO_SENDSTATE_WAIT){
915
+        break;
916
+      }
917
+    }
918
+  }
919
+  if(!d){
920
+    d = mkreq(&(m->mdata), &(m->addr), MAKUO_SENDSTATE_WAIT);
921
+    d->mdata.head.flags = m->mdata.head.flags;
922
+    d->mdata.head.reqid = getrid();
923
+    d->initstate = 1;
924
+    d->sendwait  = 0;
925
+    d->sendto    = 1;
926
+    d->dryrun    = m->dryrun;
927
+    d->recurs    = m->recurs;
928
+    d->link      = m;
929
+    d->mdata.p   = d->mdata.data;
930
+  }
921 931
 
922 932
   if(m->len >= sizeof(m->mod)){
923 933
     len = m->len - sizeof(m->mod);
... ...
@@ -928,7 +921,14 @@ static void msend_req_del_stat_read(int s, mfile *m)
928 928
   }
929 929
 
930 930
   while(1){
931
-    if(atomic_read(m->pipe, &(m->len), sizeof(m->len))){
931
+    if(r = atomic_read(m->pipe, &(m->len), sizeof(m->len), 1)){
932
+      if(r == -1){
933
+        if(errno == EAGAIN){
934
+          return;
935
+        }else{
936
+          lprintf(0, "[error] %s: length read error\n", __func__);
937
+        }
938
+      }
932 939
       break;
933 940
     }
934 941
     if(m->len <= sizeof(m->mod)){
... ...
@@ -936,11 +936,11 @@ static void msend_req_del_stat_read(int s, mfile *m)
936 936
       break;
937 937
     }
938 938
     len = m->len - sizeof(m->mod);
939
-    if(atomic_read(m->pipe, &(m->mod), sizeof(m->mod))){
939
+    if(atomic_read(m->pipe, &(m->mod), sizeof(m->mod), 0)){
940 940
       lprintf(0, "[error] %s: filemode read error\n", __func__);
941 941
       break;
942 942
     }
943
-    if(atomic_read(m->pipe, m->tn, len)){
943
+    if(atomic_read(m->pipe, m->tn, len, 0)){
944 944
       lprintf(0, "[error] %s: filename read error\n", __func__);
945 945
       break;
946 946
     }
... ...
@@ -956,13 +956,17 @@ static void msend_req_del_stat_read(int s, mfile *m)
956 956
       continue;
957 957
     }
958 958
     if(d->mdata.head.szdata + sizeof(m->len) + m->len > MAKUO_BUFFER_SIZE){
959
-      return; /* packet full */
959
+      d->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
960
+    }else{
961
+      strcpy(d->fn, m->tn);
962
+      data_safeset16(&(d->mdata), m->len);
963
+      data_safeset32(&(d->mdata), m->mod);
964
+      data_safeset(&(d->mdata), m->tn, len);
965
+      m->len = 0;
960 966
     }
961
-    data_safeset16(&(d->mdata), m->len);
962
-    data_safeset32(&(d->mdata), m->mod);
963
-    data_safeset(&(d->mdata), m->tn, len);
964
-    m->len = 0;
967
+    return;
965 968
   }
969
+  d->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
966 970
   close(m->pipe);
967 971
   m->pipe      = -1;
968 972
   m->initstate =  1;
... ...
@@ -1011,6 +1015,15 @@ static void msend_req_del_last(int s, mfile *m)
1011 1011
 
1012 1012
 static void msend_req_del_break(int s, mfile *m)
1013 1013
 {
1014
+  mfile *d;
1015
+  for(d=mftop[MFSEND];d;d=d->next){
1016
+    if(d->link == m){
1017
+      if(d->mdata.head.nstate == MAKUO_SENDSTATE_WAIT){
1018
+        msend_mfdel(d);
1019
+        break;
1020
+      }
1021
+    }
1022
+  }
1014 1023
   msend_mfdel(m);
1015 1024
   lprintf(0,"%s: break dsync\n", __func__);
1016 1025
 }
... ...
@@ -1136,19 +1149,18 @@ static void msend_req(int s, mfile *m)
1136 1136
 *******************************************************************/
1137 1137
 void msend(mfile *m)
1138 1138
 {
1139
-  if(msend_retry(m)){
1140
-    return;
1141
-  }
1142
-  mtimeget(&m->lastsend);
1143 1139
   if(m->mdata.head.flags & MAKUO_FLAG_ACK){
1144 1140
     msend_ack(moption.mcsocket, m); 
1145 1141
   }else{
1146
-    msend_req(moption.mcsocket, m); 
1142
+    if(!msend_retry(m)){
1143
+      msend_req(moption.mcsocket, m); 
1144
+      mtimeget(&m->lastsend);
1145
+    }
1147 1146
   }
1148 1147
 }
1149 1148
 void msend_clean()
1150 1149
 {
1151
-  mfile *m = mftop[0];
1150
+  mfile *m = mftop[MFSEND];
1152 1151
   while(m=msend_mfdel(m));
1153 1152
 }
1154 1153
 
... ...
@@ -28,6 +28,7 @@ char *sstatestrlist[]={"SEND_STAT   ",
28 28
                        "SEND_LAST   ",
29 29
                        "SEND_ERROR  ",
30 30
                        "SEND_BREAK  ",
31
+                       "SEND_WAIT   ",
31 32
                        "SEND_UNKNOWN"};
32 33
 
33 34
 uint8_t sstatenumlist[]={MAKUO_SENDSTATE_STAT,
... ...
@@ -38,6 +39,7 @@ uint8_t sstatenumlist[]={MAKUO_SENDSTATE_STAT,
38 38
                          MAKUO_SENDSTATE_LAST,
39 39
                          MAKUO_SENDSTATE_ERROR,
40 40
                          MAKUO_SENDSTATE_BREAK,
41
+                         MAKUO_SENDSTATE_WAIT,
41 42
                          MAKUO_STATE_MAX};
42 43
 
43 44
 char *rstatestrlist[] = {"RECV_NONE    ",
... ...
@@ -182,11 +184,11 @@ void cprintf(int l, mcomm *c, char *fmt, ...)
182 182
   }
183 183
 }
184 184
 
185
-void mprintf(const char *func, mfile *m)
185
+void mprintf(int l, const char *func, mfile *m)
186 186
 {
187 187
   if(!m)
188 188
     return;
189
-  lprintf(9, "%s: rc=%d rid=%d init=%d wait=%d %s %s %s %s\n",
189
+  lprintf(l, "%s: rc=%d rid=%d init=%d wait=%d %s %s %s %s\n",
190 190
     func, 
191 191
     m->retrycnt,
192 192
     m->mdata.head.reqid,