... | ... |
@@ -979,7 +979,7 @@ int mexec_status(mcomm *c, int n) |
979 | 979 |
cprintf(0, c, "recv op : %d\n", count); |
980 | 980 |
for(m=mftop[MFRECV];m;m=m->next){ |
981 | 981 |
t = localtime(&(m->lastrecv.tv_sec)); |
982 |
- cprintf(0, c, " %s %s %02d:%02d:%02d %s (%d/%d) mark=%d rid=%d\n", |
|
982 |
+ cprintf(0, c, " %s %s %02d:%02d:%02d %s (%d/%d) mark=%d rid=%d ip=%s\n", |
|
983 | 983 |
stropcode(&(m->mdata)), |
984 | 984 |
strrstate(m->mdata.head.nstate), |
985 | 985 |
t->tm_hour, t->tm_min, t->tm_sec, |
... | ... |
@@ -987,7 +987,8 @@ int mexec_status(mcomm *c, int n) |
987 | 987 |
m->recvcount, |
988 | 988 |
m->seqnomax, |
989 | 989 |
m->markcount, |
990 |
- m->mdata.head.reqid); |
|
990 |
+ m->mdata.head.reqid, |
|
991 |
+ inet_ntoa(m->addr.sin_addr)); |
|
991 | 992 |
} |
992 | 993 |
return(0); |
993 | 994 |
} |
... | ... |
@@ -298,6 +298,11 @@ static void mrecv_ack_del(mdata *data, struct sockaddr_in *addr) |
298 | 298 |
if(mrecv_ack_search(&t, &m, data, addr)){ |
299 | 299 |
return; |
300 | 300 |
} |
301 |
+ if(m->mdata.head.nstate == MAKUO_SENDSTATE_CLOSE){ |
|
302 |
+ if(data->head.nstate != MAKUO_RECVSTATE_CLOSE){ |
|
303 |
+ return; |
|
304 |
+ } |
|
305 |
+ } |
|
301 | 306 |
if(!set_hoststate(t, m, data->head.nstate)){ |
302 | 307 |
lprintf(0, "%s: not allocate state area\n", __func__); |
303 | 308 |
return; |
... | ... |
@@ -319,13 +324,13 @@ static void mrecv_ack_del(mdata *data, struct sockaddr_in *addr) |
319 | 319 |
|
320 | 320 |
err = 0; |
321 | 321 |
if(m->dryrun){ |
322 |
- lprintf(1, "%s: (dryrun) delete %s\n", __func__, path); |
|
322 |
+ lprintf(1, "%s: (dryrun) delete %s rid=%d\n", __func__, path, m->mdata.head.reqid); |
|
323 | 323 |
}else{ |
324 | 324 |
if(!mremove(NULL,path)){ |
325 |
- lprintf(1, "%s: delete %s\n", __func__, path); |
|
325 |
+ lprintf(1, "%s: delete %s rid=%d\n", __func__, path, m->mdata.head.reqid); |
|
326 | 326 |
}else{ |
327 | 327 |
err = errno; |
328 |
- lprintf(0, "%s: delete error %s (%s)\n", __func__, path, strerror(errno)); |
|
328 |
+ lprintf(0, "%s: delete error %s (%s) rid=%d\n", __func__, path, strerror(errno), m->mdata.head.reqid); |
|
329 | 329 |
} |
330 | 330 |
} |
331 | 331 |
data_safeset16(&(m->mdata), len + sizeof(uint32_t)); |
... | ... |
@@ -1219,6 +1224,8 @@ static void mrecv_req_del_open(mdata *data, struct sockaddr_in *addr) |
1219 | 1219 |
uint16_t len; |
1220 | 1220 |
uint32_t mod; |
1221 | 1221 |
mfile *a = mkack(data, addr, MAKUO_RECVSTATE_OPEN); |
1222 |
+ mfile *m = mrecv_req_search(data, addr); |
|
1223 |
+ mhost *t = member_get(&(addr->sin_addr)); |
|
1222 | 1224 |
char path[PATH_MAX]; |
1223 | 1225 |
|
1224 | 1226 |
if(!a){ |
... | ... |
@@ -1247,6 +1254,19 @@ static void mrecv_req_del_open(mdata *data, struct sockaddr_in *addr) |
1247 | 1247 |
#endif |
1248 | 1248 |
} |
1249 | 1249 |
msend(a); |
1250 |
+ |
|
1251 |
+ if(m){ |
|
1252 |
+ return; |
|
1253 |
+ } |
|
1254 |
+ m = mfadd(MFRECV); |
|
1255 |
+ m->mdata.head.opcode = data->head.opcode; |
|
1256 |
+ m->mdata.head.reqid = data->head.reqid; |
|
1257 |
+ m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; |
|
1258 |
+ memcpy(&(m->addr), addr, sizeof(m->addr)); |
|
1259 |
+ mtimeget(&(m->lastrecv)); |
|
1260 |
+ if(data->head.flags & MAKUO_FLAG_DRYRUN){ |
|
1261 |
+ m->dryrun = 1; |
|
1262 |
+ } |
|
1250 | 1263 |
} |
1251 | 1264 |
|
1252 | 1265 |
static void mrecv_req_del_data_report(mfile *m, mcomm *c, uint32_t err, char *hn, char *path) |
... | ... |
@@ -1277,20 +1297,16 @@ static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr) |
1277 | 1277 |
char path[PATH_MAX]; |
1278 | 1278 |
|
1279 | 1279 |
msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN)); |
1280 |
- if(m){ |
|
1280 |
+ if(!m){ |
|
1281 |
+ return; |
|
1282 |
+ } |
|
1283 |
+ mtimeget(&(m->lastrecv)); |
|
1284 |
+ if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){ |
|
1281 | 1285 |
return; |
1282 | 1286 |
} |
1283 | 1287 |
if(t){ |
1284 | 1288 |
hn = t->hostname; |
1285 | 1289 |
} |
1286 |
- m = mfadd(MFRECV); |
|
1287 |
- m->mdata.head.opcode = data->head.opcode; |
|
1288 |
- m->mdata.head.reqid = data->head.reqid; |
|
1289 |
- m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; |
|
1290 |
- memcpy(&(m->addr), addr, sizeof(m->addr)); |
|
1291 |
- if(data->head.flags & MAKUO_FLAG_DRYRUN){ |
|
1292 |
- m->dryrun = 1; |
|
1293 |
- } |
|
1294 | 1290 |
for(a=mftop[MFSEND];a;a=a->next){ |
1295 | 1291 |
if((a->mdata.head.reqid == data->head.seqno) && (a->comm != NULL)){ |
1296 | 1292 |
c = a->comm; |
... | ... |
@@ -1305,6 +1321,8 @@ static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr) |
1305 | 1305 |
path[len] = 0; |
1306 | 1306 |
mrecv_req_del_data_report(m, c, err, hn, path); |
1307 | 1307 |
} |
1308 |
+ m->mdata.head.ostate = m->mdata.head.nstate; |
|
1309 |
+ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE; |
|
1308 | 1310 |
} |
1309 | 1311 |
|
1310 | 1312 |
static void mrecv_req_del_close(mdata *data, struct sockaddr_in *addr) |
... | ... |
@@ -1087,20 +1087,20 @@ static void msend_req_del_stat(int s, mfile *m) |
1087 | 1087 |
{ |
1088 | 1088 |
if(m->pipe != -1){ |
1089 | 1089 |
msend_req_del_stat_read(s, m); |
1090 |
+ return; |
|
1091 |
+ } |
|
1092 |
+ if(msend_req_del_stat_waitcheck(s, m)){ |
|
1093 |
+ m->sendwait = 1; |
|
1094 |
+ return; |
|
1095 |
+ } |
|
1096 |
+ if(m->link){ |
|
1097 |
+ msend(mkack(&(m->link->mdata), &(m->link->addr), MAKUO_RECVSTATE_CLOSE)); /* send ack for dsync */ |
|
1098 |
+ } |
|
1099 |
+ if(waitpid(m->pid, NULL, WNOHANG) != m->pid){ |
|
1100 |
+ m->sendwait = 1; |
|
1090 | 1101 |
}else{ |
1091 |
- if(msend_req_del_stat_waitcheck(s, m)){ |
|
1092 |
- m->sendwait = 1; |
|
1093 |
- }else{ |
|
1094 |
- if(m->link){ |
|
1095 |
- msend(mkack(&(m->link->mdata), &(m->link->addr), MAKUO_RECVSTATE_CLOSE)); |
|
1096 |
- } |
|
1097 |
- if(waitpid(m->pid, NULL, WNOHANG) != m->pid){ |
|
1098 |
- m->sendwait = 1; |
|
1099 |
- }else{ |
|
1100 |
- m->pid = 0; |
|
1101 |
- msend_mfdel(m); |
|
1102 |
- } |
|
1103 |
- } |
|
1102 |
+ m->pid = 0; |
|
1103 |
+ msend_mfdel(m); |
|
1104 | 1104 |
} |
1105 | 1105 |
} |
1106 | 1106 |
|
... | ... |
@@ -1110,6 +1110,7 @@ static void msend_req_del_break(int s, mfile *m) |
1110 | 1110 |
for(d=mftop[MFSEND];d;d=d->next){ |
1111 | 1111 |
if(d->link == m){ |
1112 | 1112 |
if(d->mdata.head.nstate == MAKUO_SENDSTATE_WAIT){ |
1113 |
+ d->link = NULL; |
|
1113 | 1114 |
msend_mfdel(d); |
1114 | 1115 |
break; |
1115 | 1116 |
} |