Browse code

Ver0.9.0

Masanobu Yasui authored on 2008/10/08 11:10:14
Showing 6 changed files
... ...
@@ -287,21 +287,23 @@ int seq_addmark(mfile *m, uint32_t lseq, uint32_t useq)
287 287
       lprintf(0,"%s: out of memory\n", __func__);
288 288
       return(-1); 
289 289
     }
290
-    m->mark = n;
290
+    m->mark     = n;
291 291
     m->marksize = size;
292 292
 
293 293
     /***** retry(apap) *****/
294
-    a = mfins(0);
295
-    if(!a){
296
-      lprintf(0,"%s: out of memory\n", __func__);
297
-    }else{
298
-      a->mdata.head.flags |= MAKUO_FLAG_ACK;
299
-      a->mdata.head.opcode = m->mdata.head.opcode;
300
-      a->mdata.head.reqid  = m->mdata.head.reqid;
301
-      a->mdata.head.szdata = 0;
302
-      a->mdata.head.seqno  = m->mdata.head.seqno;
303
-      a->mdata.head.nstate = MAKUO_RECVSTATE_RETRY;
304
-      memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
294
+    if(m->mdata.head.nstate == MAKUO_RECVSTATE_OPEN){
295
+      a = mfins(0);
296
+      if(!a){
297
+        lprintf(0,"%s: out of memory\n", __func__);
298
+      }else{
299
+        a->mdata.head.flags |= MAKUO_FLAG_ACK;
300
+        a->mdata.head.opcode = m->mdata.head.opcode;
301
+        a->mdata.head.reqid  = m->mdata.head.reqid;
302
+        a->mdata.head.szdata = 0;
303
+        a->mdata.head.seqno  = m->mdata.head.seqno;
304
+        a->mdata.head.nstate = MAKUO_RECVSTATE_RETRY;
305
+        memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
306
+      }
305 307
     }
306 308
   }
307 309
   for(i=lseq;i<useq;i++){
... ...
@@ -271,7 +271,12 @@ int ismsend(mfile *m)
271 271
   }
272 272
   if(!r){
273 273
     m->sendwait = 0;
274
-    return(1);
274
+    if(!(m->senddelay)){
275
+      return(1);
276
+    }
277
+    if(mtimeout(&(m->lastsend), m->senddelay)){
278
+      return(1);
279
+    }
275 280
   }
276 281
   if(mtimeout(&(m->lastsend), MAKUO_SEND_TIMEOUT)){
277 282
     if(m->retrycnt){
... ...
@@ -51,6 +51,7 @@
51 51
 /*----- timeout -----*/
52 52
 #define MAKUO_SEND_TIMEOUT  500    /* 再送間隔(ms)                                 */
53 53
 #define MAKUO_SEND_RETRYCNT 120    /* 再送回数                                     */
54
+#define MAKUO_SEND_DELAYSTP 1      /* 送出遅延時間の増分(ms)                        */
54 55
 #define MAKUO_PONG_TIMEOUT  180000 /* メンバから除外するまでの時間(ms)             */
55 56
 #define MAKUO_PONG_INTERVAL 45000  /* PING送信間隔(ms)                             */
56 57
 #define MAKUO_RECV_GCWAIT   300000 /* 消し損ねたオブジェクトを開放する待ち時間(ms) */
... ...
@@ -181,6 +182,7 @@ typedef struct
181 181
   uint32_t retrycnt;
182 182
   uint32_t sendwait;
183 183
   uint32_t lickflag;
184
+  uint32_t senddelay;
184 185
   uint32_t initstate;
185 186
   uint32_t recvcount;
186 187
   uint32_t markcount;
... ...
@@ -475,17 +475,30 @@ int mexec_status(mcomm *c, int n)
475 475
 {
476 476
   int count;
477 477
   mfile  *m;
478
-  cprintf(0,c,"MAKUOSAN version %s\n", MAKUOSAN_VERSION);
479
-  
478
+  struct tm *t;
479
+
480
+  cprintf(0,c,"version  : %s\n", MAKUOSAN_VERSION);
480 481
   count = 0;
481 482
   for(m=mftop[0];m;m=m->next)
482 483
     count++;
483
-  cprintf(0,c,"send object: %d\n", count);
484
+  cprintf(0,c,"send file: %d\n", count);
485
+  for(m=mftop[0];m;m=m->next){
486
+    if(m->lickflag){
487
+      cprintf(0, c, "  state=%d %s (%d/%d)\n",m->mdata.head.nstate, m->fn, m->mdata.head.seqno,m->seqnomax); 
488
+    }else{
489
+      cprintf(0, c, "  state=%d %s (%d/%d)\n",m->mdata.head.nstate, m->fn, m->mdata.head.seqno,m->seqnomax); 
490
+    }
491
+  }
484 492
 
485 493
   count = 0;
486 494
   for(m=mftop[1];m;m=m->next)
487 495
     count++;
488
-  cprintf(0,c,"recv object: %d\n", count);
496
+  cprintf(0, c, "recv file: %d\n", count);
497
+  for(m=mftop[1];m;m=m->next){
498
+    t = localtime(&(m->lastrecv.tv_sec));
499
+    cprintf(0, c, "  state=%d %02d:%02d:%02d %s (%d/%d) mark=%d\n",
500
+      m->mdata.head.nstate, t->tm_hour, t->tm_min, t->tm_sec, m->fn, m->recvcount, m->seqnomax, m->markcount); 
501
+  }
489 502
   return(0);
490 503
 }
491 504
 
... ...
@@ -228,8 +228,10 @@ static void mrecv_ack_send(mdata *data, struct sockaddr_in *addr)
228 228
     }
229 229
   }
230 230
   if(data->head.nstate == MAKUO_RECVSTATE_RETRY){
231
-    lprintf(1, "%s: send retry %s from %s\n", __func__, m->fn, t->hostname);
232
-    m->lickflag = 0;
231
+    lprintf(0, "%s: send retry %s from %s\n", __func__, m->fn, t->hostname);
232
+    m->sendwait   = 0;
233
+    m->lickflag   = 0;
234
+    m->senddelay += MAKUO_SEND_DELAYSTP;
233 235
     m->mdata.head.seqno  = 0;
234 236
     m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
235 237
   }else{
... ...
@@ -502,7 +504,6 @@ static void mrecv_req_send_mark(mfile *m, mdata *r)
502 502
   a->mdata.head.szdata = 0;
503 503
   memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
504 504
   m->lickflag = 1;
505
-  a->lickflag = 1;
506 505
   if(m->mdata.head.seqno < m->seqnomax){
507 506
     seq_addmark(m, m->mdata.head.seqno, m->seqnomax);
508 507
     m->mdata.head.seqno = m->seqnomax;
... ...
@@ -530,24 +531,12 @@ static void mrecv_req_send_close(mfile *m, mdata *r)
530 530
   sprintf(fpath, "%s/%s", moption.base_dir, m->fn);
531 531
   sprintf(tpath, "%s/%s", moption.base_dir, m->tn);
532 532
 
533
-  switch(m->mdata.head.nstate){
534
-    case MAKUO_RECVSTATE_OPEN:
535
-    case MAKUO_RECVSTATE_UPDATE:
536
-    case MAKUO_RECVSTATE_MARK:
537
-      m->mdata.head.ostate = m->mdata.head.nstate;
538
-      m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
539
-      break;
540
-    case MAKUO_RECVSTATE_CLOSE:
541
-    case MAKUO_RECVSTATE_CLOSEERROR:
542
-      break;
543
-    default:
544
-      return;
545
-  }
546
-
547
-  if(m->fd != -1){
548
-    fstat(m->fd, &fs);
549
-    close(m->fd);
550
-    m->fd = -1;
533
+  if(m->mdata.head.nstate == MAKUO_RECVSTATE_OPEN){
534
+    if(m->fd != -1){
535
+      fstat(m->fd, &fs);
536
+      close(m->fd);
537
+      m->fd = -1;
538
+    }
551 539
     mftime.actime  = m->fs.st_ctime; 
552 540
     mftime.modtime = m->fs.st_mtime;
553 541
     if(S_ISLNK(m->fs.st_mode)){
... ...
@@ -566,7 +555,7 @@ static void mrecv_req_send_close(mfile *m, mdata *r)
566 566
         if(fs.st_size != m->fs.st_size){
567 567
           m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR;
568 568
           lprintf(0, "%s: close error %s (file size mismatch %d != %d)\n", __func__, m->fn, (int)(fs.st_size), (int)(m->fs.st_size));
569
-          lprintf(0, "%s: seq=%d max=%d markcnt=%d\n", __func__, m->mdata.head.seqno, m->seqnomax, m->markcount);
569
+          lprintf(0, "%s: seq=%d max=%d mark=%d recv=%d\n", __func__, m->mdata.head.seqno, m->seqnomax, m->markcount, m->recvcount);
570 570
           mremove(moption.base_dir, m->tn);
571 571
         }else{
572 572
           if(!mrename(moption.base_dir, m->tn, m->fn)){
... ...
@@ -583,6 +572,20 @@ static void mrecv_req_send_close(mfile *m, mdata *r)
583 583
     }
584 584
   }
585 585
 
586
+  switch(m->mdata.head.nstate){
587
+    case MAKUO_RECVSTATE_OPEN:
588
+    case MAKUO_RECVSTATE_UPDATE:
589
+    case MAKUO_RECVSTATE_MARK:
590
+      m->mdata.head.ostate = m->mdata.head.nstate;
591
+      m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE;
592
+      break;
593
+    case MAKUO_RECVSTATE_CLOSE:
594
+    case MAKUO_RECVSTATE_CLOSEERROR:
595
+      break;
596
+    default:
597
+      return;
598
+  }
599
+
586 600
   a = mfins(0);
587 601
   if(!a){
588 602
     lprintf(0,"%s: out of memory\n", __func__);
... ...
@@ -328,8 +328,8 @@ static void msend_req_send_open(int s, mfile *m)
328 328
   }
329 329
   lprintf(9,"%s: %s\n", __func__, m->fn);
330 330
   if(ack_check(m, MAKUO_RECVSTATE_OPEN) != 1){
331
-    m->initstate = 1;
332
-    m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
331
+    m->sendwait  = 1;
332
+    ack_clear(m, MAKUO_RECVSTATE_UPDATE);
333 333
   }else{
334 334
     if(S_ISLNK(m->fs.st_mode)){
335 335
       m->initstate = 1;
... ...
@@ -461,6 +461,16 @@ static void msend_req_send_close(int s, mfile *m)
461 461
     msend_packet(s, &(m->mdata), &(m->addr));
462 462
     return;
463 463
   }
464
+  if(ack_check(m, MAKUO_RECVSTATE_OPEN) == 1){
465
+    m->sendwait  = 1;
466
+    ack_clear(m, MAKUO_RECVSTATE_OPEN);
467
+    return;
468
+  }
469
+  if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
470
+    m->sendwait  = 1;
471
+    ack_clear(m, MAKUO_RECVSTATE_UPDATE);
472
+    return;
473
+  }
464 474
   lprintf(9,"%s: %s\n", __func__, m->fn);
465 475
   m->mdata.head.nstate = MAKUO_SENDSTATE_LAST;
466 476
 }