Browse code

speed up

Masanobu Yasui authored on 2008/12/12 02:50:34
Showing 7 changed files
... ...
@@ -206,12 +206,18 @@ void fdprintf(int s, char *fmt, ...)
206 206
 void lprintf(int l, char *fmt, ...)
207 207
 {
208 208
   va_list arg;
209
+  struct timeval tv;
209 210
   char msg[2048];
210 211
   if(moption.loglevel >= l){
211 212
     va_start(arg, fmt);
212 213
     vsprintf(msg, fmt, arg);
213 214
     va_end(arg);
215
+#ifdef MAKUO_DEBUG
216
+    gettimeofday(&tv, NULL);
217
+    fprintf(stderr, "%02d.%06d %s", tv.tv_sec % 60, tv.tv_usec, msg);
218
+#else
214 219
     fprintf(stderr, "%s", msg);
220
+#endif
215 221
     syslog(LOG_ERR, "%s: %s", moption.user_name, msg);
216 222
   }
217 223
 }
... ...
@@ -233,7 +239,7 @@ void cprintf(int l, mcomm *c, char *fmt, ...)
233 233
   }
234 234
 }
235 235
 
236
-void mprintf(char *func, mfile *m)
236
+void mprintf(const char *func, mfile *m)
237 237
 {
238 238
   char *st;
239 239
   char *op;
... ...
@@ -131,17 +131,10 @@ int mcomm_accept(mcomm *c, fd_set *fds, int s)
131 131
   return(0);
132 132
 }
133 133
 
134
-int mcomm_read(mcomm *c, fd_set *fds){
134
+void mcomm_check(mcomm *c){
135 135
   int i, j;
136 136
   mfile *m;
137 137
   for(i=0;i<MAX_COMM;i++){
138
-    for(j=0;j<2;j++){
139
-      if(c[i].fd[j] != -1){
140
-        if(FD_ISSET(c[i].fd[j], fds) || c[i].check[j]){
141
-          mexec(&c[i], j);
142
-        }
143
-      }
144
-    }
145 138
     if(c[i].fd[1] == -1){
146 139
       for(m=mftop[0];m;m=m->next){
147 140
         if(m->comm == &c[i]){
... ...
@@ -155,6 +148,20 @@ int mcomm_read(mcomm *c, fd_set *fds){
155 155
       }
156 156
     }
157 157
   }
158
+}
159
+
160
+int mcomm_read(mcomm *c, fd_set *fds){
161
+  int i, j;
162
+  mfile *m;
163
+  for(i=0;i<MAX_COMM;i++){
164
+    for(j=0;j<2;j++){
165
+      if(c[i].fd[j] != -1){
166
+        if(FD_ISSET(c[i].fd[j], fds) || c[i].check[j]){
167
+          mexec(&c[i], j);
168
+        }
169
+      }
170
+    }
171
+  }
158 172
   return(0);
159 173
 }
160 174
 
... ...
@@ -185,8 +192,8 @@ int mcomm_fdset(mcomm *c, fd_set *fds)
185 185
 }
186 186
 
187 187
 int mfdirchk(mfile *d){
188
-  int len = strlen(d->fn);
189 188
   mfile *m;
189
+  int len = strlen(d->fn);
190 190
   if(!len){
191 191
     return(1);
192 192
   }
... ...
@@ -243,6 +250,9 @@ int ismsend(int s, mfile *m)
243 243
 /***** main loop *****/
244 244
 int mloop()
245 245
 {
246
+  int para;
247
+  mfile *n;
248
+  mfile *m;
246 249
   fd_set rfds;
247 250
   fd_set wfds;
248 251
   struct timeval *lastpong;
... ...
@@ -251,46 +261,41 @@ int mloop()
251 251
   gettimeofday(&curtime,NULL);
252 252
   lastpong = pingpong(0);
253 253
   while(loop_flag){
254
-    tv.tv_sec  = 1;
255
-    tv.tv_usec = 0;
256
-    FD_ZERO(&rfds);
257
-    FD_ZERO(&wfds);
258
-
259 254
     gettimeofday(&curtime,NULL);
260 255
     if(mtimeout(lastpong, MAKUO_PONG_INTERVAL))
261 256
       lastpong = pingpong(1);
262
-
263
-    mcomm_fdset(moption.comm, &rfds);
257
+    m = mftop[0];
258
+    while(mrecv(moption.mcsocket)){
259
+      if(m != mftop[0]){
260
+        break;
261
+      }
262
+    }
263
+    para = 0;
264
+    n = NULL;
265
+    m = mftop[0];
266
+    while(m){
267
+      n = m->next;
268
+      para += ismsend(moption.mcsocket, m);
269
+      m = n;
270
+      if(para == moption.parallel){
271
+        break;
272
+      }
273
+    }
274
+    mcomm_check(moption.comm);
275
+    FD_ZERO(&rfds);
276
+    FD_ZERO(&wfds);
264 277
     FD_SET(moption.mcsocket,  &rfds);
278
+    mcomm_fdset(moption.comm, &rfds);
265 279
     if(mftop[0]){
266
-      tv.tv_sec  = 0;
267
-      tv.tv_usec = 10000;
268 280
       FD_SET(moption.mcsocket, &wfds);
269 281
     }
270
-
271
-    if(select(1024, &rfds, &wfds, NULL, &tv) < 0)
282
+    tv.tv_sec  = 1;
283
+    tv.tv_usec = 0;
284
+    if(select(1024, &rfds, &wfds, NULL, &tv) == -1)
272 285
       continue;
273
-
274
-    gettimeofday(&curtime,NULL);
275
-    if(FD_ISSET(moption.mcsocket,&wfds)){
276
-      int para = 0;
277
-      mfile *n = NULL;
278
-      mfile *m = mftop[0];
279
-      while(m){
280
-        n = m->next;
281
-        para += ismsend(moption.mcsocket, m);
282
-        m = n;
283
-        if(para == moption.parallel){
284
-          break;
285
-        }
286
-      }
287
-    }
288
-    if(FD_ISSET(moption.mcsocket,&rfds))
289
-      mrecv(moption.mcsocket);
290
-
291
-    mrecv_gc();
292 286
     mcomm_accept(moption.comm, &rfds, moption.lisocket); /* new console  */
293 287
     mcomm_read(moption.comm, &rfds);                     /* command exec */
288
+    mrecv_gc();
294 289
   }
295 290
   return(0);
296 291
 }
... ...
@@ -8,6 +8,7 @@
8 8
 #define PROTOCOL_VERSION 4
9 9
 #define _GNU_SOURCE
10 10
 #define _FILE_OFFSET_BITS 64
11
+#define MAKUO_DEBUG
11 12
 #include <stdio.h>
12 13
 #include <unistd.h>
13 14
 #include <stdlib.h>
... ...
@@ -55,8 +56,8 @@
55 55
 #define MAKUO_MCAST_PORT  5000
56 56
 
57 57
 /*----- timeout -----*/
58
-#define MAKUO_SEND_TIMEOUT  500    /* 再送間隔(ms)                                 */
59
-#define MAKUO_SEND_RETRYCNT 120    /* 再送回数                                     */
58
+#define MAKUO_SEND_TIMEOUT  250    /* 再送間隔(ms)                                 */
59
+#define MAKUO_SEND_RETRYCNT 240    /* 再送回数                                     */
60 60
 #define MAKUO_PONG_TIMEOUT  180000 /* メンバから除外するまでの時間(ms)             */
61 61
 #define MAKUO_PONG_INTERVAL 45000  /* PING送信間隔(ms)                             */
62 62
 #define MAKUO_RECV_GCWAIT   180000 /* 消し損ねたオブジェクトを開放する待ち時間(ms) */
... ...
@@ -222,8 +223,8 @@ typedef struct mfile
222 222
 
223 223
 typedef struct
224 224
 {
225
-  uint8_t state[MAKUO_PARALLEL_MAX];
226
-  mfile *mflist[MAKUO_PARALLEL_MAX];
225
+  uint8_t state[MAKUO_PARALLEL_MAX * MAX_COMM];
226
+  mfile *mflist[MAKUO_PARALLEL_MAX * MAX_COMM];
227 227
   char hostname[MAKUO_HOSTNAME_MAX];
228 228
   char version[32];
229 229
   struct in_addr ad;
... ...
@@ -278,7 +279,7 @@ extern BF_KEY EncKey;
278 278
 char    *SSTATE(uint8_t n);
279 279
 char    *RSTATE(uint8_t n);
280 280
 char    *OPCODE(uint8_t n);
281
-void     mprintf(char *func, mfile *m);
281
+void     mprintf(const char *func, mfile *m);
282 282
 void     lprintf(int l, char *fmt, ...);
283 283
 void     cprintf(int l, mcomm *c, char *fmt, ...);
284 284
 void     fdprintf(int s, char *fmt, ...);
... ...
@@ -293,7 +294,7 @@ mfile   *mkack(mdata *data, struct sockaddr_in *addr, uint8_t state);
293 293
 mhost   *member_get(struct in_addr *addr);
294 294
 mhost   *member_add(struct in_addr *addr, mdata *recvdata);
295 295
 void     member_del(mhost *h);
296
-void     mrecv(int s);
296
+int      mrecv(int s);
297 297
 void     msend(int s, mfile *m);
298 298
 void     set_filestat(char *path, uid_t uid, gid_t gid, mode_t mode);
299 299
 int      set_guid(int uid, int gid, gid_t *gids);
... ...
@@ -257,8 +257,9 @@ int mexec_send(mcomm *c, int n, int sync)
257 257
   char *argv[9];
258 258
   char *fn = NULL;
259 259
   mfile *m = NULL;
260
-  mhost *h = NULL;
261
-  int recursive = 0;
260
+  mhost *t = NULL;
261
+  int dryrun = 0;
262
+  int recurs = 0;
262 263
   int mode = MAKUO_MEXEC_SEND;
263 264
 
264 265
   if(moption.dontsend){
... ...
@@ -272,16 +273,17 @@ int mexec_send(mcomm *c, int n, int sync)
272 272
   while((i=getopt(c->argc[n], argv, "t:nr")) != -1){
273 273
     switch(i){
274 274
       case 'n':
275
+        dryrun = 1;
275 276
         mode = MAKUO_MEXEC_DRY;
276 277
         break;
277 278
       case 'r':
278
-        recursive = 1;
279
+        recurs = 1;
279 280
         break;
280 281
       case 't':
281
-        for(h=members;h;h=h->next)
282
-          if(!strcmp(h->hostname, optarg))
282
+        for(t=members;t;t=t->next)
283
+          if(!strcmp(t->hostname, optarg))
283 284
             break;
284
-        if(!h){
285
+        if(!t){
285 286
           cprintf(0, c, "%s is not contained in members\r\n", optarg);
286 287
           return(0);
287 288
         }
... ...
@@ -296,12 +298,12 @@ int mexec_send(mcomm *c, int n, int sync)
296 296
     fn = c->parse[n][optind++];
297 297
 
298 298
   /*----- directory scan -----*/
299
-  if(recursive){
299
+  if(recurs){
300 300
     if(c->cpid){
301 301
       cprintf(0, c, "recursive process active now!\n");
302 302
       return(0);
303 303
     }
304
-    return(mexec_scan(c, fn, h, mode));
304
+    return(mexec_scan(c, fn, t, mode));
305 305
   }
306 306
   /*----- help -----*/
307 307
   if(!fn){
... ...
@@ -326,9 +328,9 @@ int mexec_send(mcomm *c, int n, int sync)
326 326
 	}
327 327
 
328 328
   /*----- send to address set -----*/
329
-  if(h){
329
+  if(t){
330 330
     m->sendto = 1;
331
-    memcpy(&(m->addr.sin_addr), &(h->ad), sizeof(m->addr.sin_addr));
331
+    memcpy(&(m->addr.sin_addr), &(t->ad), sizeof(m->addr.sin_addr));
332 332
   }
333 333
 
334 334
 	strcpy(m->fn, fn);
... ...
@@ -336,7 +338,8 @@ int mexec_send(mcomm *c, int n, int sync)
336 336
 	m->mdata.head.opcode = MAKUO_OP_SEND;
337 337
   m->mdata.head.nstate = MAKUO_SENDSTATE_STAT;
338 338
 	m->comm      = c;
339
-  m->dryrun    = (mode == MAKUO_MEXEC_DRY);
339
+  m->dryrun    = dryrun;
340
+  m->recurs    = recurs;
340 341
   m->initstate = 1;
341 342
   if(m->dryrun){
342 343
     m->mdata.head.flags |= MAKUO_FLAG_DRYRUN;
... ...
@@ -396,8 +399,10 @@ int mexec_send(mcomm *c, int n, int sync)
396 396
 		  cprintf(0, c, "error: readlink error %s\n", fn);
397 397
 		  lprintf(0, "%s: readlink error %s\n", __func__, fn);
398 398
 		  mfdel(m);
399
+      return(0);
399 400
     }
400
-  }  
401
+  } 
402
+ 
401 403
   return(0);
402 404
 }
403 405
 
... ...
@@ -879,7 +884,8 @@ int mexec(mcomm *c, int n)
879 879
   if(n == 1){
880 880
     for(m=mftop[0];m;m=m->next){
881 881
       if(m->comm == c){
882
-        if(count++ == MAKUO_PARALLEL_MAX){
882
+        count++;
883
+        if(count == MAKUO_PARALLEL_MAX){
883 884
           return(-1);
884 885
         }
885 886
       }
... ...
@@ -339,6 +339,10 @@ static void minit_socket()
339 339
     lprintf(0, "%s: can't create multicast socket\n", __func__);
340 340
     exit(1);
341 341
   }
342
+  if(fcntl(s, F_SETFL , O_NONBLOCK)){
343
+    lprintf(0, "%s: fcntl error\n", __func__);
344
+    exit(1);
345
+  }
342 346
   if(bind(s, (struct sockaddr*)&addr, sizeof(addr)) == -1){
343 347
     lprintf(0, "%s: bind error\n", __func__);
344 348
     exit(1);
... ...
@@ -76,7 +76,10 @@ static int mrecv_packet(int s, mdata *data, struct sockaddr_in *addr)
76 76
     if(recvsize != -1){
77 77
       break;
78 78
     }else{
79
-      if(errno == EAGAIN || errno == EINTR){
79
+      if(errno == EAGAIN){
80
+        return(-1);
81
+      }
82
+      if(errno == EINTR){
80 83
         continue;
81 84
       }else{
82 85
         lprintf(0, "%s: recv error from %s\n", __func__, inet_ntoa(addr->sin_addr));
... ...
@@ -112,18 +115,19 @@ static int mrecv_packet(int s, mdata *data, struct sockaddr_in *addr)
112 112
 * Receive common functions (public)
113 113
 *
114 114
 *******************************************************************/
115
-void mrecv(int s)
115
+int mrecv(int s)
116 116
 {
117 117
   mdata  data;
118 118
   struct sockaddr_in addr;
119 119
   if(mrecv_packet(s, &data, &addr) == -1){
120
-    return;
120
+    return(0);
121 121
   }
122 122
   if(data.head.flags & MAKUO_FLAG_ACK){
123 123
     mrecv_ack(&data, &addr);
124 124
   }else{
125 125
     mrecv_req(&data, &addr);
126 126
   }
127
+  return(1);
127 128
 }
128 129
 
129 130
 void mrecv_gc()
... ...
@@ -1201,7 +1205,7 @@ static void mrecv_req_dsync_break(mfile *m, mdata *data, struct sockaddr_in *add
1201 1201
  */
1202 1202
 static void mrecv_req_dsync(mdata *data, struct sockaddr_in *addr)
1203 1203
 {
1204
-  lprintf(9, "%s:\n", __func__);
1204
+  lprintf(9, "%s: rid=%06d %s %s\n", __func__, data->head.reqid, OPCODE(data->head.opcode), SSTATE(data->head.nstate));
1205 1205
   mfile *m = mrecv_req_search(data, addr);
1206 1206
   switch(data->head.nstate){
1207 1207
     case MAKUO_SENDSTATE_OPEN:
... ...
@@ -201,12 +201,13 @@ static void msend_ack_md5(int s, mfile *m)
201 201
 
202 202
 static void msend_ack_dsync(int s, mfile *m)
203 203
 {
204
+  mprintf(__func__, m);
204 205
   msend_shot(s, m);
205 206
 }
206 207
 
207 208
 static void msend_ack_del(int s, mfile *m)
208 209
 {
209
-  lprintf(9, "%s:\n", __func__);
210
+  mprintf(__func__, m);
210 211
   msend_shot(s, m);
211 212
 }
212 213
 
... ...
@@ -843,7 +844,7 @@ static void msend_req_dsync_break(int s, mfile *m)
843 843
 /*----- dsync -----*/
844 844
 static void msend_req_dsync(int s, mfile *m)
845 845
 {
846
-  lprintf(9, "%s:\n", __func__);
846
+  lprintf(9, "%s: rid=%d %s %s\n", __func__, m->mdata.head.reqid, OPCODE(m->mdata.head.opcode), SSTATE(m->mdata.head.nstate));
847 847
   if(m->mdata.head.nstate != MAKUO_SENDSTATE_LAST){
848 848
     if(!m->comm){
849 849
       if(m->mdata.head.nstate != MAKUO_SENDSTATE_BREAK){
... ...
@@ -872,10 +873,24 @@ static void msend_req_del_mark(int s, mfile *m)
872 872
 {
873 873
   lprintf(9, "%s:\n", __func__);
874 874
   mfile *d = m->link; /* dsync object */
875
-  if(member_get(&(d->addr.sin_addr))){
876
-    mkack(&(d->mdata), &(d->addr), MAKUO_RECVSTATE_CLOSE);
877
-  }else{
878
-    d->lastrecv.tv_sec = 1;
875
+  if(m->initstate){
876
+    m->initstate = 0;
877
+    m->sendwait  = 1;
878
+    ack_clear(m, -1);
879
+    if(member_get(&(d->addr.sin_addr))){
880
+      mkack(&(d->mdata), &(d->addr), MAKUO_RECVSTATE_CLOSE);
881
+    }else{
882
+      d->lastrecv.tv_sec = 1;
883
+    }
884
+    return;
885
+  }
886
+  if(m->sendwait){
887
+    if(member_get(&(d->addr.sin_addr))){
888
+      mkack(&(d->mdata), &(d->addr), MAKUO_RECVSTATE_CLOSE);
889
+    }else{
890
+      d->lastrecv.tv_sec = 1;
891
+    }
892
+    return;
879 893
   }
880 894
 }
881 895
 
... ...
@@ -942,10 +957,12 @@ static void msend_req_del_stat(int s, mfile *m)
942 942
 
943 943
 static void msend_req_del_last(int s, mfile *m)
944 944
 {
945
+  lprintf(9, "%s:\n", __func__);
945 946
   msend_mfdel(m);
946 947
 }
947 948
 static void msend_req_del_break(int s, mfile *m)
948 949
 {
950
+  lprintf(9, "%s:\n", __func__);
949 951
   if(m->link){
950 952
     m->link->lastrecv.tv_sec = 1;
951 953
   }
... ...
@@ -954,6 +971,7 @@ static void msend_req_del_break(int s, mfile *m)
954 954
 
955 955
 static void msend_req_del_open(int s, mfile *m)
956 956
 {
957
+  lprintf(9, "%s:\n", __func__);
957 958
   if(m->initstate){
958 959
     m->initstate = 0;
959 960
     m->sendwait  = 1;
... ...
@@ -970,6 +988,7 @@ static void msend_req_del_open(int s, mfile *m)
970 970
 
971 971
 static void msend_req_del_data(int s, mfile *m)
972 972
 {
973
+  lprintf(9, "%s:\n", __func__);
973 974
   if(m->initstate){
974 975
     m->initstate = 0;
975 976
     m->sendwait  = 1;
... ...
@@ -988,6 +1007,7 @@ static void msend_req_del_data(int s, mfile *m)
988 988
 
989 989
 static void msend_req_del_close(int s, mfile *m)
990 990
 {
991
+  lprintf(9, "%s:\n", __func__);
991 992
   if(m->initstate){
992 993
     m->initstate = 0;
993 994
     m->sendwait  = 1;