Browse code

Ver0.9.0

Masanobu Yasui authored on 2008/10/08 23:21:42
Showing 4 changed files
... ...
@@ -271,7 +271,7 @@ int seq_addmark(mfile *m, uint32_t lseq, uint32_t useq)
271 271
 
272 272
   if(!m->mark){
273 273
     m->markcount = 0;
274
-    m->marksize  = 512;
274
+    m->marksize  = 1024;
275 275
     m->mark = malloc(sizeof(uint32_t) * m->marksize);
276 276
     if(!m->mark){
277 277
       lprintf(0,"%s: out of memory\n", __func__);
... ...
@@ -280,7 +280,7 @@ int seq_addmark(mfile *m, uint32_t lseq, uint32_t useq)
280 280
   }
281 281
   size = m->marksize;
282 282
   while(size < m->markcount + useq - lseq)
283
-    size += 512;
283
+    size += 1024;
284 284
   if(size != m->marksize){
285 285
     n = realloc(m->mark, sizeof(uint32_t) * size);
286 286
     if(!n){
... ...
@@ -296,6 +296,7 @@ int seq_addmark(mfile *m, uint32_t lseq, uint32_t useq)
296 296
       if(!a){
297 297
         lprintf(0,"%s: out of memory\n", __func__);
298 298
       }else{
299
+        lprintf(0,"%s: mark over(%d/%d) retry %s\n", __func__, m->markcount, m->marksize, m->fn);
299 300
         a->mdata.head.flags |= MAKUO_FLAG_ACK;
300 301
         a->mdata.head.opcode = m->mdata.head.opcode;
301 302
         a->mdata.head.reqid  = m->mdata.head.reqid;
... ...
@@ -350,7 +351,6 @@ void clr_hoststate(mfile *m)
350 350
 {
351 351
   int    i;
352 352
   mhost *t;
353
-
354 353
   for(t=members;t;t=t->next){
355 354
     for(i=0;i<MAKUO_PARALLEL_MAX;i++){
356 355
       if(t->mflist[i] == m){
... ...
@@ -361,6 +361,17 @@ void clr_hoststate(mfile *m)
361 361
   }
362 362
 }
363 363
 
364
+void dump_hoststate(mfile *m, char *func)
365
+{
366
+  mhost   *t;
367
+  uint8_t *r;
368
+  for(t=members;t;t=t->next){
369
+    if(r=get_hoststate(t,m)){
370
+      lprintf(9,"%s: state=%d from %s %s\n", func, (int)(*r), t->hostname, m->fn);
371
+    }
372
+  }
373
+}
374
+
364 375
 uint8_t *get_hoststate(mhost *t, mfile *m)
365 376
 {
366 377
   int i;
... ...
@@ -81,18 +81,20 @@ int restoreguid()
81 81
 
82 82
 void recv_timeout(mfile *m)
83 83
 {
84
-  mhost *h;
84
+  mhost   *t;
85
+  uint8_t *r;
85 86
   if(m){
86 87
     m->retrycnt = MAKUO_SEND_RETRYCNT;
87 88
     do{
88
-      for(h=members;h;h=h->next){
89
-        if(h->state == MAKUO_RECVSTATE_NONE){
90
-          lprintf(0,"%s: %s(%s) timeout\n", __func__, inet_ntoa(h->ad), h->hostname);
91
-          member_del(h);
89
+      for(t=members;t;t=t->next){
90
+        r = get_hoststate(t, m);
91
+        if(*r == MAKUO_RECVSTATE_NONE){
92
+          lprintf(0,"%s: %s(%s) timeout\n", __func__, inet_ntoa(t->ad), t->hostname);
93
+          member_del(t);
92 94
           break;
93 95
         }
94 96
       }
95
-    }while(h); 
97
+    }while(t); 
96 98
   }
97 99
 }
98 100
 
... ...
@@ -254,44 +256,45 @@ int mcomm_fdset(mcomm *c, fd_set *fds)
254 254
   return(0);
255 255
 }
256 256
 
257
-int ismsend(mfile *m)
257
+int ismsend(int s, mfile *m)
258 258
 {
259 259
   int r;
260
-
261
-  if(!m)
260
+  if(!m){
262 261
     return(0);
263
-  if(!m->sendwait){
264
-    return(1);
265 262
   }
266
-  r = ack_check(m,MAKUO_RECVSTATE_NONE);
263
+  if(!S_ISLNK(m->fs.st_mode) && S_ISDIR(m->fs.st_mode)){
264
+    if(m != mftop[0]){
265
+      return(0);
266
+    }
267
+  }
268
+  if(m->senddelay){
269
+    if(!mtimeout(&(m->lastsend), m->senddelay)){
270
+      return(1);
271
+    }
272
+  }
273
+  r = ack_check(m, MAKUO_RECVSTATE_NONE);
267 274
   if(r == -1){
268 275
     m->mdata.head.seqno  = 0;
269 276
     m->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
270
-    return(1);
271 277
   }
272 278
   if(!r){
273 279
     m->sendwait = 0;
274
-    if(!(m->senddelay)){
275
-      return(1);
276
-    }
277
-    if(mtimeout(&(m->lastsend), m->senddelay)){
278
-      return(1);
279
-    }
280 280
   }
281
-  if(mtimeout(&(m->lastsend), MAKUO_SEND_TIMEOUT)){
282
-    if(m->retrycnt){
281
+  if(m->sendwait){
282
+    if(!mtimeout(&(m->lastsend), MAKUO_SEND_TIMEOUT)){
283 283
       return(1);
284
-    }else{
284
+    }
285
+    if(!(m->retrycnt)){
285 286
       recv_timeout(m);
286 287
     }
287 288
   }
288
-  return(0);
289
+  msend(s,m);
290
+  return(1);
289 291
 }
290 292
 
291 293
 /***** main loop *****/
292 294
 int mloop()
293 295
 {
294
-  int i;
295 296
   fd_set rfds;
296 297
   fd_set wfds;
297 298
   struct timeval *lastpong;
... ...
@@ -321,23 +324,14 @@ int mloop()
321 321
 
322 322
     gettimeofday(&curtime,NULL);
323 323
     if(FD_ISSET(moption.mcsocket,&wfds)){
324
-      mfile *m = mftop[0];
324
+      int para = 0;
325 325
       mfile *n = NULL;
326
-      for(i=0;i<moption.parallel;i++){
326
+      mfile *m = mftop[0];
327
+      while(m){
327 328
         n = m->next;
328
-        if(ismsend(m)){
329
-          if(S_ISLNK(m->fs.st_mode) || !S_ISDIR(m->fs.st_mode)){
330
-            msend(moption.mcsocket, m);
331
-          }else{
332
-            if(m == mftop[0]){
333
-              msend(moption.mcsocket, m);
334
-            }else{
335
-              i--;
336
-            }
337
-          }
338
-        }
329
+        para += ismsend(moption.mcsocket, m);
339 330
         m = n;
340
-        if(!m){
331
+        if(para == moption.parallel){
341 332
           break;
342 333
         }
343 334
       }
... ...
@@ -202,10 +202,9 @@ static void mrecv_ack_ping(mdata *data, struct sockaddr_in *addr)
202 202
 
203 203
 static void mrecv_ack_send(mdata *data, struct sockaddr_in *addr)
204 204
 {
205
-  uint8_t *s;
205
+  uint8_t *r;
206 206
   mhost   *t;
207 207
   mfile   *m;
208
-
209 208
   if(mrecv_ack_search(&t, &m, data, addr)){
210 209
     return;
211 210
   }
... ...
@@ -215,7 +214,7 @@ static void mrecv_ack_send(mdata *data, struct sockaddr_in *addr)
215 215
     lprintf(0,          "%s: file update ignore rid=%06d state=%02d %s(%s) %s\n", __func__, 
216 216
       data->head.reqid, data->head.nstate, inet_ntoa(t->ad), t->hostname, m->fn);
217 217
   }
218
-  if(data->head.nstate == MAKUO_RECVSTATE_OPEN){
218
+  if(data->head.nstate == MAKUO_RECVSTATE_MARK){
219 219
     uint32_t *d = (uint32_t *)(data->data);
220 220
     while(d < (uint32_t *)&data->data[data->head.szdata]){
221 221
       if(*d >= m->seqnomax){
... ...
@@ -235,8 +234,8 @@ static void mrecv_ack_send(mdata *data, struct sockaddr_in *addr)
235 235
     m->mdata.head.seqno  = 0;
236 236
     m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
237 237
   }else{
238
-    if(s = get_hoststate(t, m)){
239
-      *s = data->head.nstate;
238
+    if(r = get_hoststate(t, m)){
239
+      *r = data->head.nstate;
240 240
     }else{
241 241
       lprintf(0, "%s: hoststate error\n", __func__);
242 242
     }
... ...
@@ -249,7 +248,6 @@ static void mrecv_ack_md5(mdata *data, struct sockaddr_in *addr)
249 249
   uint8_t *s;
250 250
   mhost   *t;
251 251
   mfile   *m;
252
-
253 252
   mrecv_ack_search(&t, &m, data, addr);
254 253
   if(!t || !m){
255 254
     return;
... ...
@@ -286,7 +284,6 @@ static void mrecv_ack(mdata *data, struct sockaddr_in *addr)
286 286
     case MAKUO_OP_MD5:
287 287
       mrecv_ack_md5(data, addr);
288 288
       break;
289
-
290 289
     /* 機能追加はここへ */
291 290
   }
292 291
 }
... ...
@@ -345,7 +342,7 @@ static void mrecv_req_send_break(mfile *m, mdata *r)
345 345
 
346 346
 static void mrecv_req_send_stat(mfile *m, mdata *r)
347 347
 {
348
-  mfile *a;
348
+  mfile  *a;
349 349
   struct stat fs;
350 350
   struct utimbuf mftime;
351 351
 
... ...
@@ -500,8 +497,10 @@ static void mrecv_req_send_mark(mfile *m, mdata *r)
500 500
   a->mdata.head.opcode = r->head.opcode;
501 501
   a->mdata.head.reqid  = r->head.reqid;
502 502
   a->mdata.head.seqno  = r->head.seqno;
503
-  a->mdata.head.nstate = m->mdata.head.nstate;
503
+  a->mdata.head.ostate = m->mdata.head.nstate;
504
+  a->mdata.head.nstate = MAKUO_RECVSTATE_MARK;
504 505
   a->mdata.head.szdata = 0;
506
+
505 507
   memcpy(&(a->addr), &(m->addr), sizeof(a->addr));
506 508
   m->lickflag = 1;
507 509
   if(m->mdata.head.seqno < m->seqnomax){
... ...
@@ -292,6 +292,7 @@ static void msend_req_send_open_init(int s, mfile *m)
292 292
   m->sendwait  = 1;
293 293
   m->initstate = 0;
294 294
   ack_clear(m, MAKUO_RECVSTATE_UPDATE);
295
+
295 296
   /*----- symlink -----*/
296 297
   if(S_ISLNK(m->fs.st_mode)){
297 298
     msend_packet(s, &(m->mdata), &(m->addr));
... ...
@@ -326,23 +327,23 @@ static void msend_req_send_open(int s, mfile *m)
326 326
     msend_packet(s, &(m->mdata), &(m->addr));
327 327
     return;
328 328
   }
329
-  lprintf(9,"%s: %s\n", __func__, m->fn);
330
-  if(ack_check(m, MAKUO_RECVSTATE_OPEN) != 1){
331
-    m->sendwait  = 1;
329
+  lprintf(9, "%s: %s\n", __func__, m->fn);
330
+  if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
331
+    m->sendwait = 1;
332 332
     ack_clear(m, MAKUO_RECVSTATE_UPDATE);
333
+    return;
334
+  }
335
+  if(S_ISLNK(m->fs.st_mode)){
336
+    m->initstate = 1;
337
+    m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
333 338
   }else{
334
-    if(S_ISLNK(m->fs.st_mode)){
339
+    if(S_ISDIR(m->fs.st_mode)){
335 340
       m->initstate = 1;
336 341
       m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
337
-    }else{
338
-      if(S_ISDIR(m->fs.st_mode)){
339
-        m->initstate = 1;
340
-        m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
341
-      }
342
-      if(S_ISREG(m->fs.st_mode)){
343
-        m->mdata.head.seqno  = 0;
344
-        m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
345
-      }
342
+    }
343
+    if(S_ISREG(m->fs.st_mode)){
344
+      m->mdata.head.seqno  = 0;
345
+      m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
346 346
     }
347 347
   }
348 348
 }
... ...
@@ -351,6 +352,7 @@ static void msend_req_send_markdata(int s, mfile *m)
351 351
 {
352 352
   int i;
353 353
   int r;
354
+  dump_hoststate(m, __func__);
354 355
   if(!m->markcount){
355 356
     /* close */
356 357
     m->initstate = 1;
... ...
@@ -375,11 +377,11 @@ static void msend_req_send_markdata(int s, mfile *m)
375 375
       }
376 376
     }
377 377
   }
378
-  /* eof */
379 378
   m->markcount = 0;
380 379
   m->initstate = 1;
381 380
   m->mdata.head.seqno  = 0;
382 381
   m->mdata.head.nstate = MAKUO_SENDSTATE_MARK;
382
+  ack_clear(m, MAKUO_RECVSTATE_MARK);
383 383
 }
384 384
 
385 385
 static void msend_req_send_filedata(int s, mfile *m)
... ...
@@ -395,10 +397,10 @@ static void msend_req_send_filedata(int s, mfile *m)
395 395
   }else{
396 396
     if(readsize == -1){
397 397
       /* err */
398
-      lprintf(0, "%s: read error! seqno=%d errno=%d\n", __func__, m->mdata.head.seqno, errno);
398
+      lprintf(0, "%s: read error! seqno=%d errno=%d %s\n", __func__, m->mdata.head.seqno, errno, m->fn);
399 399
     }else{
400 400
       /* eof */
401
-      lprintf(4, "%s: block send count=%d\n", __func__, m->mdata.head.seqno);
401
+      lprintf(4, "%s: block send count=%d %s\n", __func__, m->mdata.head.seqno, m->fn);
402 402
       m->mdata.head.seqno  = 0;
403 403
       m->mdata.head.nstate = MAKUO_SENDSTATE_MARK;
404 404
       m->initstate = 1;
... ...
@@ -425,6 +427,7 @@ static void msend_req_send_mark_init(int s, mfile *m)
425 425
 {
426 426
   m->sendwait  = 1;
427 427
   m->initstate = 0;
428
+  ack_clear(m, MAKUO_RECVSTATE_UPDATE);
428 429
   ack_clear(m, MAKUO_RECVSTATE_OPEN);
429 430
   msend_packet(s, &(m->mdata), &(m->addr));
430 431
 }
... ...
@@ -439,6 +442,15 @@ static void msend_req_send_mark(int s, mfile *m)
439 439
     msend_packet(s, &(m->mdata), &(m->addr));
440 440
     return;
441 441
   }
442
+  if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
443
+    msend_req_send_mark_init(s, m);
444
+    return;
445
+  }
446
+  if(ack_check(m, MAKUO_RECVSTATE_OPEN) == 1){
447
+    msend_req_send_mark_init(s, m);
448
+    return;
449
+  }
450
+  lprintf(9, "%s: mark=%d %s\n", __func__, m->markcount, m->fn);
442 451
   m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
443 452
 }
444 453
 
... ...
@@ -446,8 +458,9 @@ static void msend_req_send_close_init(int s, mfile *m)
446 446
 {
447 447
   m->sendwait  = 1;
448 448
   m->initstate = 0;
449
-  ack_clear(m, MAKUO_RECVSTATE_OPEN);
450 449
   ack_clear(m, MAKUO_RECVSTATE_UPDATE);
450
+  ack_clear(m, MAKUO_RECVSTATE_OPEN);
451
+  ack_clear(m, MAKUO_RECVSTATE_MARK);
451 452
   msend_packet(s, &(m->mdata), &(m->addr));
452 453
 }
453 454
 
... ...
@@ -461,14 +474,16 @@ static void msend_req_send_close(int s, mfile *m)
461 461
     msend_packet(s, &(m->mdata), &(m->addr));
462 462
     return;
463 463
   }
464
+  if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
465
+    msend_req_send_close_init(s, m);
466
+    return;
467
+  }
464 468
   if(ack_check(m, MAKUO_RECVSTATE_OPEN) == 1){
465
-    m->sendwait  = 1;
466
-    ack_clear(m, MAKUO_RECVSTATE_OPEN);
469
+    msend_req_send_close_init(s, m);
467 470
     return;
468 471
   }
469
-  if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
470
-    m->sendwait  = 1;
471
-    ack_clear(m, MAKUO_RECVSTATE_UPDATE);
472
+  if(ack_check(m, MAKUO_RECVSTATE_MARK) == 1){
473
+    msend_req_send_close_init(s, m);
472 474
     return;
473 475
   }
474 476
   lprintf(9,"%s: %s\n", __func__, m->fn);