Browse code

--delete

Masanobu Yasui authored on 2008/12/06 04:32:08
Showing 7 changed files
... ...
@@ -11,6 +11,22 @@ int loop_flag   = 1;
11 11
 struct timeval curtime;
12 12
 BF_KEY EncKey;
13 13
 
14
+char *opcodestrlist[7]={"PING ",
15
+                        "EXIT ",
16
+                        "SEND ",
17
+                        "MD5  ",
18
+                        "DSYNC",
19
+                        "DEL  ",
20
+                        "UNKNOWN"};
21
+
22
+uint8_t opcodenumlist[7]={MAKUO_OP_PING,
23
+                          MAKUO_OP_EXIT,
24
+                          MAKUO_OP_SEND,
25
+                          MAKUO_OP_MD5,
26
+                          MAKUO_OP_DSYNC,
27
+                          MAKUO_OP_DEL,
28
+                          MAKUO_OPCODE_MAX};
29
+
14 30
 char *sstatestrlist[9]={"SEND_STAT",
15 31
                         "SEND_OPEN",
16 32
                         "SEND_DATA",
... ...
@@ -18,7 +34,7 @@ char *sstatestrlist[9]={"SEND_STAT",
18 18
                         "SEND_CLOSE",
19 19
                         "SEND_LAST",
20 20
                         "SEND_ERROR",
21
-                        "SEND_BREAK"
21
+                        "SEND_BREAK",
22 22
                         "SEND_UNKNOWN"};
23 23
 
24 24
 uint8_t sstatenumlist[9]={MAKUO_SENDSTATE_STAT,
... ...
@@ -39,6 +55,7 @@ char *rstatestrlist[16] = {"RECV_NONE",
39 39
                            "RECV_CLOSE",
40 40
                            "RECV_IGNORE",
41 41
                            "RECV_READONLY",
42
+                           "RECV_BREAK",
42 43
                            "RECV_MD5OK",
43 44
                            "RECV_MD5NG",
44 45
                            "RECV_OPENERR",
... ...
@@ -55,6 +72,7 @@ uint8_t rstatenumlist[16]={MAKUO_RECVSTATE_NONE,
55 55
                            MAKUO_RECVSTATE_CLOSE,
56 56
                            MAKUO_RECVSTATE_IGNORE,
57 57
                            MAKUO_RECVSTATE_READONLY,
58
+                           MAKUO_RECVSTATE_BREAK,
58 59
                            MAKUO_RECVSTATE_MD5OK,
59 60
                            MAKUO_RECVSTATE_MD5NG,
60 61
                            MAKUO_RECVSTATE_OPENERROR,
... ...
@@ -85,6 +103,17 @@ char *RSTATE(uint8_t n)
85 85
   return(rstatestrlist[i]);
86 86
 }
87 87
 
88
+char *OPCODE(uint8_t n)
89
+{
90
+  int i;
91
+  for(i=0;opcodenumlist[i] != MAKUO_STATE_MAX;i++){
92
+    if(opcodenumlist[i] == n){
93
+      break;
94
+    }
95
+  }
96
+  return(opcodestrlist[i]);
97
+}
98
+
88 99
 int md5sum(int fd, unsigned char *digest)
89 100
 {
90 101
   int  rd;
... ...
@@ -241,7 +270,8 @@ mfile *mfnew()
241 241
   if(m = (mfile *)malloc(sizeof(mfile))){
242 242
     memset(m, 0, sizeof(mfile));
243 243
     m->mdata.head.vproto = PROTOCOL_VERSION;
244
-    m->fd = -1;
244
+    m->fd   = -1;
245
+    m->pipe = -1;
245 246
     m->retrycnt = MAKUO_SEND_RETRYCNT;
246 247
     memcpy(&(m->addr), &(moption.maddr), sizeof(m->addr));
247 248
   }
... ...
@@ -257,8 +287,8 @@ mfile *mfadd(int n)
257 257
     }else{
258 258
       mfile *l;
259 259
       for(l=mftop[n];l->next;l=l->next);
260
-      l->next = (void *)m;
261
-      m->prev = (void *)l;
260
+      l->next = m;
261
+      m->prev = l;
262 262
       m->next = NULL;
263 263
     }
264 264
   }
... ...
@@ -704,7 +734,6 @@ int mremove(char *base, char *name)
704 704
     strcpy(path,name);
705 705
   }else{
706 706
     sprintf(path, "%s/%s", base, name);
707
-    lprintf(0, "%s: %s\n", __func__, path);
708 707
   }
709 708
   if(is_dir(path)){
710 709
     if(d = opendir(path)){
... ...
@@ -195,11 +195,6 @@ int ismsend(int s, mfile *m)
195 195
       return(0);
196 196
     }
197 197
   }
198
-  if(m->senddelay){
199
-    if(!mtimeout(&(m->lastsend), m->senddelay)){
200
-      return(1);
201
-    }
202
-  }
203 198
   r = ack_check(m, MAKUO_RECVSTATE_NONE);
204 199
   if(r == -1){
205 200
     m->mdata.head.seqno  = 0;
... ...
@@ -47,6 +47,7 @@
47 47
 #define MAKUO_HOSTNAME_MAX 255
48 48
 #define MAKUO_PARALLEL_MAX   8
49 49
 #define MAKUO_STATE_MAX    255
50
+#define MAKUO_OPCODE_MAX   255
50 51
 
51 52
 /*----- default -----*/
52 53
 #define MAKUO_LOCAL_ADDR  "127.0.0.1"
... ...
@@ -56,7 +57,6 @@
56 56
 /*----- timeout -----*/
57 57
 #define MAKUO_SEND_TIMEOUT  500    /* 再送間隔(ms)                                 */
58 58
 #define MAKUO_SEND_RETRYCNT 120    /* 再送回数                                     */
59
-#define MAKUO_SEND_DELAYSTP 0      /* 送出遅延時間の増分(ms)                       */
60 59
 #define MAKUO_PONG_TIMEOUT  180000 /* メンバから除外するまでの時間(ms)             */
61 60
 #define MAKUO_PONG_INTERVAL 45000  /* PING送信間隔(ms)                             */
62 61
 #define MAKUO_RECV_GCWAIT   180000 /* 消し損ねたオブジェクトを開放する待ち時間(ms) */
... ...
@@ -70,10 +70,12 @@
70 70
 #define MAKUO_OP_DEL   5
71 71
 
72 72
 /*----- flags -----*/
73
-#define MAKUO_FLAG_ACK   1
74
-#define MAKUO_FLAG_CRYPT 2
75
-#define MAKUO_FLAG_WAIT  4
76
-#define MAKUO_FLAG_FMARK 8
73
+#define MAKUO_FLAG_ACK    1
74
+#define MAKUO_FLAG_CRYPT  2
75
+#define MAKUO_FLAG_WAIT   4
76
+#define MAKUO_FLAG_FMARK  8
77
+#define MAKUO_FLAG_DRYRUN 16
78
+#define MAKUO_FLAG_RECURS 32
77 79
 
78 80
 /*----- sendstatus -----*/
79 81
 #define MAKUO_SENDSTATE_STAT       0  /* 更新確認待 */
... ...
@@ -94,6 +96,7 @@
94 94
 #define MAKUO_RECVSTATE_CLOSE      5
95 95
 #define MAKUO_RECVSTATE_IGNORE     6
96 96
 #define MAKUO_RECVSTATE_READONLY   7
97
+#define MAKUO_RECVSTATE_BREAK      8
97 98
 #define MAKUO_RECVSTATE_MD5OK      10
98 99
 #define MAKUO_RECVSTATE_MD5NG      11
99 100
 #define MAKUO_RECVSTATE_DELETEOK   12
... ...
@@ -181,13 +184,7 @@ typedef struct
181 181
   excludeitem *exclude;
182 182
 } mcomm;
183 183
 
184
-typedef struct
185
-{
186
-  DIR  *d;
187
-  void *m;
188
-} mdelete;
189
-
190
-typedef struct
184
+typedef struct mfile
191 185
 {
192 186
   int  fd;
193 187
   char fn[PATH_MAX];
... ...
@@ -199,7 +196,6 @@ typedef struct
199 199
   uint32_t retrycnt;
200 200
   uint32_t sendwait;
201 201
   uint32_t lickflag;
202
-  uint32_t senddelay;
203 202
   uint32_t initstate;
204 203
   uint32_t recvcount;
205 204
   uint32_t markdelta;
... ...
@@ -212,15 +208,15 @@ typedef struct
212 212
   int pid;
213 213
   int pipe;
214 214
   mdata mdata;
215
-  mdelete del;
216 215
   mcomm *comm;
217 216
   uint32_t *mark;
218
-  void  *prev;
219
-  void  *next;
220 217
   struct stat fs;
221 218
   struct sockaddr_in addr;
222 219
   struct timeval lastsend;
223 220
   struct timeval lastrecv;
221
+  struct mfile *prev;
222
+  struct mfile *next;
223
+  struct mfile *link;
224 224
 } mfile;
225 225
 
226 226
 typedef struct
... ...
@@ -521,8 +521,9 @@ int mexec_dsync(mcomm *c, int n)
521 521
     }
522 522
   }
523 523
 
524
-  while(optind < c->argc[n])
524
+  while(optind < c->argc[n]){
525 525
     fn = c->parse[n][optind++];
526
+  }
526 527
 
527 528
   /*----- help -----*/
528 529
   if(c->argc[n]<2){
... ...
@@ -540,21 +541,28 @@ int mexec_dsync(mcomm *c, int n)
540 540
     return(0);
541 541
 	}
542 542
 
543
-  sprintf(m->fn,".");
543
+  strcpy(m->fn, ".");
544 544
   if(fn){
545 545
     if(*fn != '/'){
546 546
 	    strcat(m->fn, "/");
547 547
     }
548 548
 	  strcat(m->fn, fn);
549 549
   }
550
+  
550 551
   strcpy(m->mdata.data, m->fn);
551 552
 	m->mdata.head.reqid  = getrid();
553
+	m->mdata.head.szdata = strlen(m->fn);
552 554
 	m->mdata.head.opcode = MAKUO_OP_DSYNC;
553 555
   m->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
554
-  m->mdata.head.szdata = strlen(m->fn);
555
-	m->comm      = c;
556
-  m->dryrun    = dryrun;
557
-  m->recurs    = recurs;
556
+	m->comm = c;
557
+  if(dryrun){
558
+    m->dryrun = 1;
559
+    m->mdata.head.flags |= MAKUO_FLAG_DRYRUN;
560
+  }
561
+  if(recurs){
562
+    m->recurs = 1;
563
+    m->mdata.head.flags |= MAKUO_FLAG_RECURS;
564
+  }
558 565
   m->initstate = 1;
559 566
 
560 567
   /*----- send to address set -----*/
... ...
@@ -707,7 +715,11 @@ int mexec_status(mcomm *c, int n)
707 707
     if(snow > smax){
708 708
       snow = smax;
709 709
     }
710
-    cprintf(0, c, "  %s %s (%u:%u/%u)\n", SSTATE(m->mdata.head.nstate), m->fn, m->markcount, snow, smax); 
710
+    cprintf(0, c, "  %s %s %s (%u:%u/%u)\n", 
711
+      OPCODE(m->mdata.head.opcode), 
712
+      SSTATE(m->mdata.head.nstate), 
713
+      m->fn, 
714
+      m->markcount, snow, smax); 
711 715
   }
712 716
 
713 717
   count = 0;
... ...
@@ -716,8 +728,13 @@ int mexec_status(mcomm *c, int n)
716 716
   cprintf(0, c, "recv file: %d\n", count);
717 717
   for(m=mftop[1];m;m=m->next){
718 718
     t = localtime(&(m->lastrecv.tv_sec));
719
-    cprintf(0, c, "  %s %02d:%02d:%02d %s (%d/%d) mark=%d\n",
720
-      RSTATE(m->mdata.head.nstate), t->tm_hour, t->tm_min, t->tm_sec, m->fn, m->recvcount, m->seqnomax, m->markcount); 
719
+    cprintf(0, c, "  %s %s %02d:%02d:%02d %s (%d/%d) mark=%d\n",
720
+      OPCODE(m->mdata.head.opcode), 
721
+      RSTATE(m->mdata.head.nstate), 
722
+      t->tm_hour, t->tm_min, t->tm_sec, 
723
+      m->fn, 
724
+      m->recvcount, m->seqnomax, 
725
+      m->markcount); 
721 726
   }
722 727
   return(0);
723 728
 }
... ...
@@ -312,26 +312,65 @@ static void mrecv_ack_dsync(mdata *data, struct sockaddr_in *addr)
312 312
 
313 313
 static void mrecv_ack_del(mdata *data, struct sockaddr_in *addr)
314 314
 {
315
-  uint8_t *s;
316
-  mhost   *t;
317
-  mfile   *m;
315
+  uint8_t   *s;
316
+  mhost     *t;
317
+  mfile     *m;
318
+  uint32_t err;
319
+  uint16_t len;
320
+
318 321
   mrecv_ack_search(&t, &m, data, addr);
319 322
   if(!t || !m){
320 323
     return;
321 324
   }
322 325
   mtimeget(&m->lastrecv);
323
-  s = get_hoststate(t,m);
324
-  if(!s){
325
-    lprintf(0, "%s: not allocate state area\n", __func__);
326
+
327
+  if(m->mdata.head.nstate == MAKUO_SENDSTATE_CLOSE){
328
+    m->mdata.head.nstate = MAKUO_SENDSTATE_STAT;
329
+    s = get_hoststate(t,m);
330
+    if(!s){
331
+      lprintf(0, "%s: not allocate state area\n", __func__);
332
+      return;
333
+    }
334
+    *s = data->head.nstate;
326 335
     return;
327 336
   }
328
-  if(data->head.nstate == MAKUO_RECVSTATE_DELETEOK){
329
-    lprintf(0,"%s: delete OK %s\n", __func__, m->fn);
337
+
338
+  if(m->mdata.head.nstate == MAKUO_SENDSTATE_DATA){
339
+    m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
340
+    return;
330 341
   }
331
-  if(data->head.nstate == MAKUO_RECVSTATE_DELETENG){
332
-    lprintf(0,"%s: delete NG %s\n", __func__, m->fn);
342
+
343
+  if(m->mdata.head.nstate == MAKUO_SENDSTATE_STAT){
344
+    if(data->head.nstate != MAKUO_RECVSTATE_DELETEOK){
345
+      s = get_hoststate(t,m);
346
+      if(!s){
347
+        lprintf(0, "%s: not allocate state area\n", __func__);
348
+        return;
349
+      }
350
+      *s = data->head.nstate;
351
+    }else{
352
+      err = 0;
353
+      len = strlen(m->fn);
354
+      m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
355
+      if(m->dryrun){
356
+        lprintf(1, "%s: (dryrun) delete %s\n", __func__, m->fn);
357
+      }else{
358
+        if(!mremove(NULL,m->fn)){
359
+          lprintf(1, "%s: delete %s\n", __func__, m->fn);
360
+        }else{
361
+          err = errno;
362
+          lprintf(0, "%s: delete error %s (%s)\n", __func__, m->fn, strerror(errno));
363
+        }
364
+      }
365
+      m->mdata.p = m->mdata.data;
366
+      *(uint32_t *)(m->mdata.p) = htonl(err);
367
+      m->mdata.p += sizeof(uint32_t);
368
+      *(uint16_t *)(m->mdata.p) = htons(len);
369
+      m->mdata.p += sizeof(uint16_t);
370
+      memcpy(m->mdata.p, m->fn, len);
371
+      m->mdata.head.szdata = sizeof(err) +  sizeof(len) + len; 
372
+    }
333 373
   }
334
-  *s = data->head.nstate;
335 374
 }
336 375
 
337 376
 static void mrecv_ack(mdata *data, struct sockaddr_in *addr)
... ...
@@ -380,7 +419,7 @@ static void mrecv_req_ping(mdata *data, struct sockaddr_in *addr)
380 380
   mfile *a;
381 381
   char buff[MAKUO_HOSTNAME_MAX + 1];
382 382
   member_add(&addr->sin_addr, data);
383
-  a = mkack(data,addr, MAKUO_RECVSTATE_NONE);
383
+  a = mkack(data, addr, MAKUO_RECVSTATE_NONE);
384 384
   if(gethostname(buff, sizeof(buff)) == -1){
385 385
     buff[0] = 0;
386 386
   }
... ...
@@ -984,22 +1023,31 @@ static void dsync_write(int fd, char *base, uint8_t state)
984 984
 {
985 985
   uint16_t len = strlen(base);
986 986
 
987
-  base += 2;
988
-  len  -= 2;
989
-  if(!len){
987
+  if(!loop_flag){
988
+    return;
989
+  }
990
+  if(!strcmp(base, ".")){
990 991
     return;
991 992
   }
993
+  if(!memcmp(base, "./", 2)){
994
+    base += 2;
995
+    len  -= 2;
996
+  }
992 997
   write(fd, &state, sizeof(state));
993 998
   write(fd, &len, sizeof(len));
994 999
   write(fd, base, len);
995 1000
 }
996 1001
 
997
-static void dsync_scan(int fd, char *base)
1002
+static void dsync_scan(int fd, char *base, int recurs)
998 1003
 {
999 1004
   DIR *d;
1000 1005
   struct stat st;
1001 1006
   struct dirent *dent;
1002 1007
   char path[PATH_MAX];
1008
+
1009
+  if(!loop_flag){
1010
+    return;
1011
+  }
1003 1012
   if(lstat(base,&st) == -1){
1004 1013
     dsync_write(fd, base, MAKUO_SENDSTATE_ERROR);
1005 1014
     return;
... ...
@@ -1017,12 +1065,18 @@ static void dsync_scan(int fd, char *base)
1017 1017
     dsync_write(fd, base, MAKUO_SENDSTATE_ERROR);
1018 1018
   }else{
1019 1019
     while(dent=readdir(d)){
1020
+      if(!loop_flag)
1021
+        break;
1020 1022
       if(!strcmp(dent->d_name, "."))
1021 1023
         continue;
1022 1024
       if(!strcmp(dent->d_name, ".."))
1023 1025
         continue;
1024 1026
       sprintf(path, "%s/%s", base, dent->d_name);
1025
-      dsync_scan(fd, path);
1027
+      if(recurs){
1028
+        dsync_scan(fd, path, recurs);
1029
+      }else{
1030
+        dsync_write(fd, path, MAKUO_SENDSTATE_STAT);
1031
+      }
1026 1032
     }
1027 1033
     closedir(d);
1028 1034
     dsync_write(fd, base, MAKUO_SENDSTATE_STAT);
... ...
@@ -1036,68 +1090,95 @@ static void mrecv_req_dsync_open(mfile *m, mdata *data, struct sockaddr_in *addr
1036 1036
   mfile *d;
1037 1037
   char path[PATH_MAX];
1038 1038
 
1039
+  lprintf(9, "%s:\n", __func__);
1039 1040
   mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1040
-  if(!m){
1041
-    m = mfadd(1);
1042
-    m->mdata.head.opcode = data->head.opcode;
1043
-    m->mdata.head.reqid  = data->head.reqid;
1044
-    m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
1045
-    memcpy(&(m->addr), addr, sizeof(m->addr));
1046
-    if(data->head.szdata){
1047
-      memcpy(m->fn, data->data, data->head.szdata);
1048
-    }
1049
-    m->fn[data->head.szdata] = 0;
1050
-
1051
-    /* 走査 */
1052
-    d = mfins(0);
1053
-    d->del.m = (void *)m;
1054
-    m->del.m = (void *)d;
1055
-    d->mdata.head.opcode = MAKUO_OP_DEL;
1056
-    d->mdata.head.reqid  = getrid();
1057
-    d->mdata.head.seqno  = data->head.reqid;
1058
-    d->mdata.head.nstate = MAKUO_SENDSTATE_STAT;
1059
-    memcpy(&(d->addr), addr, sizeof(d->addr));
1060
-    memcpy(&(d->fn), data->data, data->head.szdata);
1061
-    d->fn[data->head.szdata] = 0;
1062
-
1063
-    /* fork */
1064
-    pipe(p);
1065
-    pid = fork();
1066
-    if(pid == -1){
1067
-      close(p[0]);
1068
-      close(p[1]);
1069
-      lprintf(0, "%s: fork error\n", __func__);
1070
-      return;
1071
-    }
1072
-    if(pid){
1073
-      /* parent */
1074
-      d->pid  = pid;
1075
-      d->pipe = p[0];
1076
-      close(p[1]); /* write close */
1077
-    }else{
1078
-      /* child */
1079
-      close(p[0]); /* read close */
1080
-      dsync_scan(p[1], d->fn);
1081
-      close(p[1]);
1082
-      _exit(0);
1083
-    }  
1041
+  if(m){
1042
+    return;
1084 1043
   }
1044
+  m = mfadd(1);
1045
+  m->mdata.head.opcode = data->head.opcode;
1046
+  m->mdata.head.reqid  = data->head.reqid;
1047
+  m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
1048
+  memcpy(&(m->addr), addr, sizeof(m->addr));
1049
+  if(data->head.szdata){
1050
+    memcpy(m->fn, data->data, data->head.szdata);
1051
+  }
1052
+  m->fn[data->head.szdata] = 0;
1053
+
1054
+  /* 走査 */
1055
+  d = mfins(0);
1056
+  d->link = (void *)m;
1057
+  m->link = (void *)d;
1058
+  d->mdata.head.opcode = MAKUO_OP_DEL;
1059
+  d->mdata.head.reqid  = getrid();
1060
+  d->mdata.head.flags  = data->head.flags;
1061
+  d->mdata.head.seqno  = data->head.reqid;
1062
+  d->mdata.head.nstate = MAKUO_SENDSTATE_STAT;
1063
+  memcpy(&(d->addr), addr, sizeof(d->addr));
1064
+  memcpy(&(d->fn), data->data, data->head.szdata);
1065
+  d->fn[data->head.szdata] = 0;
1066
+  if(d->mdata.head.flags & MAKUO_FLAG_RECURS){
1067
+    d->recurs = 1;
1068
+  }
1069
+  if(d->mdata.head.flags & MAKUO_FLAG_DRYRUN){
1070
+    d->dryrun = 1;
1071
+  }
1072
+
1073
+  /* fork */
1074
+  pipe(p);
1075
+  pid = fork();
1076
+  if(pid == -1){
1077
+    lprintf(0, "%s: fork error (%s)\n", __func__, strerror(errno));
1078
+    close(p[0]);
1079
+    close(p[1]);
1080
+    return;
1081
+  }
1082
+  if(pid){
1083
+    /* parent */
1084
+    d->pid  = pid;
1085
+    d->pipe = p[0];
1086
+    close(p[1]); /* write close */
1087
+  }else{
1088
+    /* child */
1089
+    close(p[0]); /* read close */
1090
+    dsync_scan(p[1], d->fn, d->recurs);
1091
+    close(p[1]);
1092
+    _exit(0);
1093
+  }  
1085 1094
 }
1086 1095
 
1087 1096
 static void mrecv_req_dsync_last(mfile *m, mdata *data, struct sockaddr_in *addr)
1088 1097
 {
1089
-  lprintf(0,"%s:\n",__func__);
1090
-  mfile *d = m->del.m;
1091
-  d->mdata.head.nstate = MAKUO_SENDSTATE_LAST;
1092
-  d->sendwait = 0;
1098
+  lprintf(9, "%s:\n", __func__);
1099
+  if(!m){
1100
+    return;
1101
+  }
1102
+  if(m->link){
1103
+    m->link->mdata.head.nstate = MAKUO_SENDSTATE_LAST;
1104
+    m->link->sendwait = 0;
1105
+  }
1093 1106
   mrecv_mfdel(m); 
1094 1107
 }
1095 1108
 
1109
+static void mrecv_req_dsync_break(mfile *m, mdata *data, struct sockaddr_in *addr)
1110
+{
1111
+  lprintf(9, "%s:\n", __func__);
1112
+  if(m){
1113
+    if(m->link){
1114
+      m->link->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
1115
+      m->link->sendwait = 0;
1116
+    }
1117
+    mrecv_mfdel(m);
1118
+  }
1119
+  mkack(data, addr, MAKUO_RECVSTATE_BREAK);
1120
+}
1121
+
1096 1122
 /*
1097 1123
  *  dsync
1098 1124
  */
1099 1125
 static void mrecv_req_dsync(mdata *data, struct sockaddr_in *addr)
1100 1126
 {
1127
+  lprintf(9, "%s:\n", __func__);
1101 1128
   mfile *m = mrecv_req_search(data, addr);
1102 1129
   switch(data->head.nstate){
1103 1130
     case MAKUO_SENDSTATE_OPEN:
... ...
@@ -1106,39 +1187,109 @@ static void mrecv_req_dsync(mdata *data, struct sockaddr_in *addr)
1106 1106
     case MAKUO_SENDSTATE_LAST:
1107 1107
       mrecv_req_dsync_last(m, data, addr);
1108 1108
       break;
1109
+    case MAKUO_SENDSTATE_BREAK:
1110
+      mrecv_req_dsync_break(m, data, addr);
1111
+      break;
1109 1112
   }
1110 1113
 }
1111 1114
 
1112 1115
 /*
1113 1116
  *  del
1114 1117
  */
1115
-static void mrecv_req_del(mdata *data, struct sockaddr_in *addr)
1118
+static void mrecv_req_del_stat(mdata *data, struct sockaddr_in *addr)
1116 1119
 {
1117
-  uint8_t stat;
1118 1120
   uint16_t len;
1119
-
1121
+  char *hn = "unknown host";
1120 1122
   mhost *t = member_get(addr);
1121
-  if(!t){
1122
-    return;
1123
-  }
1124 1123
   mfile *a = mkack(data, addr, MAKUO_RECVSTATE_DELETENG);
1124
+  if(t){
1125
+    hn = t->hostname;
1126
+  }
1125 1127
   data->p = data->data;
1126
-  stat = *(uint8_t *)(data->p);
1127
-  data->p += sizeof(uint8_t);
1128
-  len  = ntohs(*(uint16_t *)(data->p));
1128
+  len = ntohs(*(uint16_t *)(data->p));
1129 1129
   data->p += sizeof(uint16_t);
1130 1130
   memcpy(a->fn, data->p, len);
1131 1131
   a->fn[len] = 0;
1132 1132
   if(lstat(a->fn, &(a->fs)) == -1 && errno == ENOENT){
1133
-    mfile *m;
1134
-    for(m=mftop[0];m;m=m->next){
1135
-      if(m->mdata.head.reqid == data->head.seqno){
1136
-        break;
1137
-      } 
1138
-    }
1139 1133
     a->mdata.head.nstate = MAKUO_RECVSTATE_DELETEOK;
1140
-    lprintf(1, "%s: delete %s:%s\n", __func__, t->hostname, a->fn);
1141
-    cprintf(0, m->comm, "delete %s:%s\n", t->hostname, a->fn);
1134
+    lprintf(1, "%s: delete %s:%s\n", __func__, hn, a->fn);
1135
+  }
1136
+}
1137
+
1138
+static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr)
1139
+{
1140
+  uint32_t err;
1141
+  uint16_t len;
1142
+  char *hn = "unknown host";
1143
+  mhost *t = member_get(addr);
1144
+  mcomm *c = NULL;
1145
+  mfile *m = mrecv_req_search(data, addr);
1146
+  mfile *a = mkack(data, addr, MAKUO_RECVSTATE_OPEN);
1147
+
1148
+  if(m){
1149
+    mtimeget(&(m->lastrecv));
1150
+    return;
1151
+  }
1152
+  if(t){
1153
+    hn = t->hostname;
1154
+  }
1155
+  m = mfadd(1);
1156
+  m->mdata.head.opcode = data->head.opcode;
1157
+  m->mdata.head.reqid  = data->head.reqid;
1158
+  m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN;
1159
+  memcpy(&(m->addr), addr, sizeof(m->addr));
1160
+  mtimeget(&(m->lastrecv));
1161
+  if(data->head.flags & MAKUO_FLAG_DRYRUN){
1162
+    m->dryrun = 1;
1163
+  }
1164
+  data->p = data->data;
1165
+  err = ntohl(*(uint32_t *)(data->p));
1166
+  data->p += sizeof(uint32_t);
1167
+  len = ntohs(*(uint16_t *)(data->p));
1168
+  data->p += sizeof(uint16_t);
1169
+  memcpy(m->fn, data->p, len);
1170
+  m->fn[len] = 0;
1171
+
1172
+  for(a=mftop[0];a;a=a->next){
1173
+    if((a->mdata.head.reqid == data->head.seqno) && (a->comm != NULL)){
1174
+      c = a->comm;
1175
+      break;
1176
+    } 
1177
+  }
1178
+  if(m->dryrun){
1179
+    cprintf(0, c, "(dryrun) delete %s:%s\n", hn, m->fn);
1180
+  }else{
1181
+    if(err){
1182
+      cprintf(0, c,  "(%s) delete error %s:%s\n", strerror(err), hn, m->fn);
1183
+    }else{
1184
+      cprintf(0, c,  "delete %s:%s\n", hn, m->fn);
1185
+    }
1186
+  }
1187
+}
1188
+
1189
+static void mrecv_req_del_close(mdata *data, struct sockaddr_in *addr)
1190
+{
1191
+  mfile *m = mrecv_req_search(data, addr);
1192
+  mfile *a = mkack(data, addr, MAKUO_RECVSTATE_CLOSE);
1193
+  if(!m){
1194
+    return;
1195
+  }
1196
+  mrecv_mfdel(m);
1197
+}
1198
+
1199
+static void mrecv_req_del(mdata *data, struct sockaddr_in *addr)
1200
+{
1201
+  lprintf(9, "%s:\n", __func__);
1202
+  switch(data->head.nstate){
1203
+    case MAKUO_SENDSTATE_STAT:  /* 存在確認 */
1204
+      mrecv_req_del_stat(data, addr);
1205
+      break;
1206
+    case MAKUO_SENDSTATE_DATA:  /* 結果通知 */
1207
+      mrecv_req_del_data(data, addr);
1208
+      break;
1209
+    case MAKUO_SENDSTATE_CLOSE: /* 結果開放 */
1210
+      mrecv_req_del_close(data, addr);
1211
+      break;
1142 1212
   }
1143 1213
 }
1144 1214
 
... ...
@@ -1163,9 +1314,9 @@ static void mrecv_req(mdata *data, struct sockaddr_in *addr)
1163 1163
     case MAKUO_OP_DEL:
1164 1164
       mrecv_req_del(data, addr);
1165 1165
       break;
1166
-    /*----- 機能追加はこの上 -----*/
1167 1166
     default:
1168 1167
       mkack(data, addr, MAKUO_RECVSTATE_IGNORE);
1169 1168
       break;
1170 1169
   }
1171 1170
 }
1171
+
... ...
@@ -15,13 +15,23 @@ static void msend_req(int s, mfile *m);
15 15
 static mfile *msend_mfdel(mfile *m)
16 16
 {
17 17
   mfile *r;
18
-  if(!m)
18
+  if(!m){
19 19
     return(NULL);
20
+  }
20 21
   r = m->next;
21
-  if(m->fd != -1)
22
-    close(m->fd);   
23
-  if(m->mark)
22
+  if(m->fd != -1){
23
+    close(m->fd);
24
+  }
25
+  if(m->pipe != -1){
26
+    close(m->pipe);
27
+  }
28
+  if(m->pid){
29
+    kill(m->pid, SIGTERM);
30
+    waitpid(m->pid, NULL, 0);
31
+  }
32
+  if(m->mark){
24 33
     free(m->mark);
34
+  }
25 35
   clr_hoststate(m);
26 36
   mfdel(m);
27 37
   return(r);
... ...
@@ -74,8 +84,8 @@ static int msend_packet(int s, mdata *data, struct sockaddr_in *addr)
74 74
         if(errno == EINTR){
75 75
           continue;
76 76
         }else{
77
-          lprintf(0,"%s: send error errno=%d sock=%d op=%d rid=%d state=%s size=%d seqno=%d\n", __func__,
78
-            errno, s, data->head.opcode, data->head.reqid, SSTATE(data->head.nstate), sizeof(mhead) + szdata, data->head.seqno);
77
+          lprintf(0,"%s: send error (%s) sock=%d op=%d rid=%d state=%s size=%d seqno=%d\n", __func__,
78
+            strerror(errno), s, data->head.opcode, data->head.reqid, SSTATE(data->head.nstate), sizeof(mhead) + szdata, data->head.seqno);
79 79
           return(-1);
80 80
         }
81 81
       }
... ...
@@ -657,6 +667,7 @@ static void msend_req_md5(int s, mfile *m)
657 657
 
658 658
 static void msend_req_dsync_open_init(int s, mfile *m)
659 659
 {
660
+  lprintf(9, "%s:\n", __func__);
660 661
   m->sendwait  = 1;
661 662
   m->initstate = 0;
662 663
   msend_packet(s, &(m->mdata), &(m->addr));
... ...
@@ -664,6 +675,7 @@ static void msend_req_dsync_open_init(int s, mfile *m)
664 664
 
665 665
 static void msend_req_dsync_open(int s, mfile *m)
666 666
 {
667
+  lprintf(9, "%s:\n", __func__);
667 668
   if(m->initstate){
668 669
     msend_req_dsync_open_init(s, m);
669 670
     return;
... ...
@@ -678,6 +690,7 @@ static void msend_req_dsync_open(int s, mfile *m)
678 678
 
679 679
 static void msend_req_dsync_close_init(int s, mfile *m)
680 680
 {
681
+  lprintf(9, "%s:\n", __func__);
681 682
   m->sendwait  = 1;
682 683
   m->initstate = 0;
683 684
   ack_clear(m, MAKUO_RECVSTATE_OPEN);
... ...
@@ -685,6 +698,7 @@ static void msend_req_dsync_close_init(int s, mfile *m)
685 685
 
686 686
 static void msend_req_dsync_close(int s, mfile *m)
687 687
 {
688
+  lprintf(9, "%s:\n", __func__);
688 689
   if(m->initstate){
689 690
     msend_req_dsync_close_init(s, m);
690 691
     return;
... ...
@@ -696,9 +710,42 @@ static void msend_req_dsync_close(int s, mfile *m)
696 696
   msend_mfdel(m);
697 697
 }
698 698
 
699
+static void msend_req_dsync_break_init(int s, mfile *m)
700
+{
701
+  lprintf(9, "%s:\n", __func__);
702
+  m->sendwait  = 1;
703
+  m->initstate = 0;
704
+  ack_clear(m, -1);
705
+  msend_packet(s, &(m->mdata), &(m->addr));
706
+}
707
+
708
+static void msend_req_dsync_break(int s, mfile *m)
709
+{
710
+  lprintf(9, "%s:\n", __func__);
711
+  if(m->initstate){
712
+    msend_req_dsync_break_init(s, m);
713
+    return;
714
+  }
715
+  if(m->sendwait){
716
+    msend_packet(s, &(m->mdata), &(m->addr));
717
+    return;
718
+  }
719
+  lprintf(9,"%s: m=%p\n", __func__, m);
720
+  msend_mfdel(m);
721
+}
722
+
699 723
 /*----- dsync -----*/
700 724
 static void msend_req_dsync(int s, mfile *m)
701 725
 {
726
+  lprintf(9, "%s:\n", __func__);
727
+  if(m->mdata.head.nstate != MAKUO_SENDSTATE_LAST){
728
+    if(!m->comm){
729
+      if(m->mdata.head.nstate != MAKUO_SENDSTATE_BREAK){
730
+        m->initstate = 1;
731
+        m->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
732
+      }
733
+    }
734
+  }
702 735
   switch(m->mdata.head.nstate){
703 736
     case MAKUO_SENDSTATE_OPEN:
704 737
       msend_req_dsync_open(s, m);
... ...
@@ -706,16 +753,19 @@ static void msend_req_dsync(int s, mfile *m)
706 706
     case MAKUO_SENDSTATE_CLOSE:
707 707
       msend_req_dsync_close(s, m);
708 708
       break;
709
+    case MAKUO_SENDSTATE_BREAK:
710
+      msend_req_dsync_break(s, m);
711
+      break;
709 712
     case MAKUO_SENDSTATE_LAST:
710 713
       msend_shot(s, m);
711 714
       break;
712 715
   }
713 716
 }
714 717
 
715
-static void msend_req_del_close(int s, mfile *m)
718
+static void msend_req_del_mark(int s, mfile *m)
716 719
 {
717
-  lprintf(0,"%s:\n",__func__);
718
-  mfile *d = m->del.m; /* dsync object */
720
+  lprintf(9, "%s:\n", __func__);
721
+  mfile *d = m->link; /* dsync object */
719 722
   mkack(&(d->mdata), &(d->addr), MAKUO_RECVSTATE_CLOSE);
720 723
   ack_clear(m, -1);
721 724
 }
... ...
@@ -723,18 +773,10 @@ static void msend_req_del_close(int s, mfile *m)
723 723
 static void msend_req_del_init(int s, mfile *m)
724 724
 {
725 725
   int r;
726
-  uint16_t len;
727 726
   uint8_t stat;
727
+  uint16_t len;
728 728
 
729
-  if(m->pipe == -1){
730
-    m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
731
-    msend_req_del_close(s, m);
732
-    m->sendwait  = 1;
733
-    m->initstate = 0;
734
-    ack_clear(m, -1);
735
-    return;
736
-  }
737
-
729
+  lprintf(9, "%s: \n", __func__);
738 730
   r = read(m->pipe, &stat, sizeof(stat));
739 731
   if(r <= 0){
740 732
     /* eof */
... ...
@@ -743,7 +785,7 @@ static void msend_req_del_init(int s, mfile *m)
743 743
       m->pipe = -1;
744 744
       m->pid  =  0;
745 745
     }
746
-    m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
746
+    m->mdata.head.nstate = MAKUO_SENDSTATE_MARK;
747 747
     m->sendwait  = 1;
748 748
     m->initstate = 0;
749 749
     ack_clear(m, -1);
... ...
@@ -753,14 +795,13 @@ static void msend_req_del_init(int s, mfile *m)
753 753
   read(m->pipe, &len, sizeof(len));
754 754
   read(m->pipe, m->fn, len);
755 755
   m->fn[len] = 0;
756
+  lprintf(9, "%s: fn=%s\n", __func__, m->fn);
756 757
 
757 758
   m->mdata.p = m->mdata.data;
758
-  *(uint8_t  *)(m->mdata.p) = stat;
759
-  m->mdata.p += sizeof(uint8_t);
760 759
   *(uint16_t *)(m->mdata.p) = htons(len);
761 760
   m->mdata.p += sizeof(uint16_t);
762 761
   memcpy(m->mdata.p, m->fn, len);
763
-  m->mdata.head.szdata = sizeof(stat) + sizeof(len) + len; 
762
+  m->mdata.head.szdata = sizeof(len) + len; 
764 763
   m->sendwait  = 1;
765 764
   m->initstate = 0;
766 765
   ack_clear(m, -1);
... ...
@@ -770,6 +811,11 @@ static void msend_req_del_init(int s, mfile *m)
770 770
 /*----- del -----*/
771 771
 static void msend_req_del(int s, mfile *m)
772 772
 {
773
+  lprintf(9, "%s:\n", __func__);
774
+  if(m->mdata.head.nstate == MAKUO_SENDSTATE_BREAK){
775
+    msend_mfdel(m);
776
+    return;
777
+  }
773 778
   if(m->mdata.head.nstate == MAKUO_SENDSTATE_LAST){
774 779
     msend_mfdel(m);
775 780
     return;
... ...
@@ -779,8 +825,8 @@ static void msend_req_del(int s, mfile *m)
779 779
     return;
780 780
   }
781 781
   if(m->sendwait){
782
-    if(m->mdata.head.nstate == MAKUO_SENDSTATE_CLOSE){
783
-      msend_req_del_close(s, m);
782
+    if(m->mdata.head.nstate == MAKUO_SENDSTATE_MARK){
783
+      msend_req_del_mark(s, m);
784 784
     }else{
785 785
       msend_packet(s, &(m->mdata), &(m->addr));
786 786
     }
... ...
@@ -608,6 +608,15 @@ int main(int argc, char *argv[])
608 608
       }
609 609
     }
610 610
   }else{
611
+    sprintf(cmd, "%s%s", mcmd, mopt);
612
+    for(i=optind;i<argc;i++){
613
+      strcat(cmd, " ");
614
+      strcat(cmd, argv[i]);
615
+    }
616
+    if(makuo_exec(s, cmd)){
617
+      close(s);
618
+      return(1);
619
+    }
611 620
     if(delflag){
612 621
       sprintf(cmd, "dsync%s", mopt);
613 622
       for(i=optind;i<argc;i++){
... ...
@@ -619,15 +628,6 @@ int main(int argc, char *argv[])
619 619
         return(1);
620 620
       }
621 621
     }
622
-    sprintf(cmd, "%s%s", mcmd, mopt);
623
-    for(i=optind;i<argc;i++){
624
-      strcat(cmd, " ");
625
-      strcat(cmd, argv[i]);
626
-    }
627
-    if(makuo_exec(s, cmd)){
628
-      close(s);
629
-      return(1);
630
-    }
631 622
   }
632 623
   return(makuo_quit(s));
633 624
 }