/* * mrecv.c * Copyright (C) 2008-2012 KLab Inc. */ #include "makuosan.h" #include static void mrecv_req(mdata *data, struct sockaddr_in *addr); static void mrecv_ack(mdata *data, struct sockaddr_in *addr); /****************************************************************** * * Receive common functions (private) * *******************************************************************/ static mfile *mrecv_mfdel(mfile *m) { mfile *r; if(!m){ return(NULL); } r = m->next; if(m->fd != -1){ close(m->fd); m->fd = -1; if(!S_ISLNK(m->fs.st_mode) && S_ISREG(m->fs.st_mode)){ mremove(moption.base_dir, m->tn); } } if(m->link){ m->link->link = NULL; m->link = NULL; } while((m->mark = delmark(m->mark))); mfdel(m); return(r); } static int mrecv_decrypt(mdata *data, struct sockaddr_in *addr) { int i; MD5_CTX ctx; uint8_t hash[16]; if(data->head.flags & MAKUO_FLAG_CRYPT){ if(!moption.cryptena){ lprintf(0, "%s: [warn] encrypt packet from %s. I have not key!\n", __func__, inet_ntoa(addr->sin_addr)); return(-1); } if(data->head.szdata){ for(i=0;ihead.szdata;i+=8){ BF_decrypt((BF_LONG *)(data->data + i), &EncKey); } MD5_Init(&ctx); MD5_Update(&ctx, data->data, data->head.szdata); MD5_Final(hash, &ctx); if(memcmp(hash,data->head.hash,16)){ lprintf(0, "[error] %s: protocol checksum error from %s\n", __func__, inet_ntoa(addr->sin_addr)); return(-1); } } }else{ if(moption.cryptena){ lprintf(0, "%s: [warn] not encrypt packet from %s. I have key!\n", __func__, inet_ntoa(addr->sin_addr)); return(-1); } } return(0); } static int mrecv_packet(int s, mdata *data, struct sockaddr_in *addr) { int recvsize; socklen_t addr_len; while(1){ if(!loop_flag){ return(-1); } addr_len = sizeof(struct sockaddr_in); recvsize = recvfrom(s, data, sizeof(mdata), 0, (struct sockaddr *)addr, &addr_len); if(recvsize == -1){ if(errno == EAGAIN){ return(-1); } if(errno == EINTR){ continue; }else{ lprintf(0, "[error] %s: %s recv error\n", __func__, strerror(errno)); return(-1); } } if(recvsize < sizeof(data->head)){ lprintf(0, "[error] %s: recv head size error from %s\n", __func__, inet_ntoa(addr->sin_addr)); return(-1); } data->head.szdata = ntohs(data->head.szdata); data->head.flags = ntohs(data->head.flags); data->head.reqid = ntohl(data->head.reqid); data->head.seqno = ntohl(data->head.seqno); data->head.maddr = data->head.maddr; data->head.mport = data->head.mport; data->head.error = ntohl(data->head.error); if(data->head.maddr != moption.maddr.sin_addr.s_addr){ continue; /* other group packet */ } if(data->head.mport != moption.maddr.sin_port){ continue; /* other group packet */ } if(data->head.vproto != PROTOCOL_VERSION){ continue; /* other protocol */ } if(!mrecv_decrypt(data, addr)){ break; } } return(0); } /****************************************************************** * * ack receive functions (for source node tasks) * *******************************************************************/ static int mrecv_ack_search(mhost **lpt, mfile **lpm, mdata *data, struct sockaddr_in *addr) { mhost *t; mfile *m; *lpt = NULL; *lpm = NULL; t = member_add(&addr->sin_addr, NULL); if(!t){ lprintf(0, "%s: member not found %s\n", __func__, inet_ntoa(addr->sin_addr)); return(-1); } for(m=mftop[0];m;m=m->next){ if(m->mdata.head.reqid == data->head.reqid){ break; } } if(!m){ return(-1); } *lpt = t; *lpm = m; mtimeget(&m->lastrecv); return(0); } static void mrecv_ack_report(mfile *m, mhost *t, mdata *data) { if(data->head.nstate == MAKUO_RECVSTATE_OPENERROR){ cprintf(0, m->comm, "error: %s %s:%s\n", strerror(data->head.error), t->hostname, m->fn); lprintf(0, "[error] %s: %s rid=%06d %s %s:%s\n", __func__, strerror(data->head.error), data->head.reqid, strrstate(data->head.nstate), t->hostname, m->fn); } if(data->head.nstate == MAKUO_RECVSTATE_WRITEERROR){ cprintf(0, m->comm, "error: %s %s:%s\n", strerror(data->head.error), t->hostname, m->fn); lprintf(0, "[error] %s: %s rid=%06d %s %s:%s\n", __func__, strerror(data->head.error), data->head.reqid, strrstate(data->head.nstate), t->hostname, m->fn); } if(data->head.nstate == MAKUO_RECVSTATE_CLOSEERROR){ cprintf(0, m->comm, "error: close error %s:%s\n", t->hostname, m->fn); lprintf(0, "[error] %s: close error rid=%06d %s %s:%s\n", __func__, data->head.reqid, strrstate(data->head.nstate), t->hostname, m->fn); } } static void mrecv_ack_ping(mdata *data, struct sockaddr_in *addr) { member_add(&addr->sin_addr, data); } static void mrecv_ack_send_mark(mdata *data, mfile *m, mhost *t) { uint32_t l; uint32_t h; data->p = data->data; while(!data_safeget32(data, &l)){ if(data_safeget32(data, &h)){ break; } seq_setmark(m, l, h); } } static void mrecv_ack_send(mdata *data, struct sockaddr_in *addr) { mhost *t; mfile *m; if(mrecv_ack_search(&t, &m, data, addr)){ return; } if(data->head.nstate == MAKUO_RECVSTATE_MARK){ if(m->mdata.head.nstate == MAKUO_SENDSTATE_MARK){ mrecv_ack_send_mark(data, m, t); } } if(data->head.nstate == MAKUO_RECVSTATE_OPEN){ if(m->mdata.head.nstate == MAKUO_SENDSTATE_DATA){ mrecv_ack_send_mark(data, m, t); return; } } if(!set_hoststate(t, m, data->head.nstate)){ lprintf(0, "[error] %s: host state error\n", __func__); } mrecv_ack_report(m, t, data); } static void mrecv_ack_md5(mdata *data, struct sockaddr_in *addr) { uint8_t *s; mhost *t; mfile *m; if(mrecv_ack_search(&t, &m, data, addr)){ return; } s = get_hoststate(t,m); if(!s){ lprintf(0, "%s: not allocate state area\n", __func__); return; } if(*s != data->head.nstate){ if(data->head.nstate == MAKUO_RECVSTATE_MD5OK){ cprintf(1, m->comm, "%s: OK %s\r\n", t->hostname, m->fn); lprintf(8, "%s: OK %s:%s\n", __func__, t->hostname, m->fn); } if(data->head.nstate == MAKUO_RECVSTATE_MD5NG){ cprintf(0, m->comm, "%s: NG %s\r\n", t->hostname, m->fn); lprintf(0, "%s: NG %s:%s\n", __func__, t->hostname, m->fn); } } *s = data->head.nstate; mrecv_ack_report(m, t, data); } static void mrecv_ack_dsync(mdata *data, struct sockaddr_in *addr) { mhost *t; mfile *m; if(mrecv_ack_search(&t, &m, data, addr)){ return; } for(m=mftop[MFSEND];m;m=m->next){ if(m->mdata.head.reqid == data->head.reqid){ if(m->comm){ break; } if(m->mdata.head.nstate == MAKUO_SENDSTATE_BREAK){ break; } } } if(!m){ return; } if(data->head.nstate == MAKUO_RECVSTATE_OPEN){ if(m->mdata.head.nstate == MAKUO_SENDSTATE_DATA){ if(data->head.seqno == m->mdata.head.seqno){ if(!set_hoststate(t, m, data->head.nstate)){ lprintf(0, "[error] %s: not allocate state area\n", __func__); } } return; } } if(!set_hoststate(t, m, data->head.nstate)){ lprintf(0, "[error] %s: not allocate state area\n", __func__); } } static void mrecv_ack_del(mdata *data, struct sockaddr_in *addr) { mhost *t; mfile *m; uint32_t err; uint32_t res; uint16_t len; char path[PATH_MAX]; if(mrecv_ack_search(&t, &m, data, addr)){ return; } if(!set_hoststate(t, m, data->head.nstate)){ lprintf(0, "%s: not allocate state area\n", __func__); return; } if(m->mdata.head.nstate == MAKUO_SENDSTATE_OPEN){ m->initstate = 1; m->sendwait = 0; m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE; if(data->head.nstate == MAKUO_RECVSTATE_OPEN){ data->p=data->data; m->mdata.p=m->mdata.data; m->mdata.head.szdata = 0; while(!data_safeget16(data, &len)){ m->mdata.head.nstate = MAKUO_SENDSTATE_DATA; data_safeget32(data, &res); len -= sizeof(uint32_t); data_safeget(data, path, len); path[len] = 0; err = 0; if(m->dryrun){ lprintf(1, "%s: (dryrun) delete %s\n", __func__, path); }else{ if(!mremove(NULL,path)){ lprintf(1, "%s: delete %s\n", __func__, path); }else{ err = errno; lprintf(0, "%s: delete error %s (%s)\n", __func__, path, strerror(errno)); } } data_safeset16(&(m->mdata), len + sizeof(uint32_t)); data_safeset32(&(m->mdata), err); data_safeset(&(m->mdata), path, len); } } } } static void mrecv_ack(mdata *data, struct sockaddr_in *addr) { switch(data->head.opcode){ case MAKUO_OP_PING: mrecv_ack_ping(data, addr); break; case MAKUO_OP_SEND: mrecv_ack_send(data, addr); break; case MAKUO_OP_MD5: mrecv_ack_md5(data, addr); break; case MAKUO_OP_DSYNC: mrecv_ack_dsync(data, addr); break; case MAKUO_OP_DEL: mrecv_ack_del(data, addr); break; /* 機能追加はここへ */ } } /****************************************************************** * * Request receive functions (for destination node tasks) * *******************************************************************/ static mfile *mrecv_req_search(mdata *data, struct sockaddr_in *addr) { mfile *m = mftop[MFRECV]; while(m){ if(!memcmp(&m->addr, addr, sizeof(m->addr)) && m->mdata.head.reqid == data->head.reqid){ break; } m = m->next; } return(m); } static void mrecv_req_ping(mdata *data, struct sockaddr_in *addr) { mping *p; mfile *a; char buff[MAKUO_HOSTNAME_MAX + 1]; member_add(&addr->sin_addr, data); a = mkack(data, addr, MAKUO_RECVSTATE_NONE); if(gethostname(buff, sizeof(buff)) == -1){ buff[0] = 0; } p = (mping *)(a->mdata.data); p->hostnamelen = strlen(buff); p->versionlen = strlen(PACKAGE_VERSION); a->mdata.head.szdata = sizeof(mping) + p->hostnamelen + p->versionlen; a->mdata.p = p->data; memcpy(a->mdata.p, buff, p->hostnamelen); a->mdata.p += p->hostnamelen; memcpy(a->mdata.p, PACKAGE_VERSION, p->versionlen); a->mdata.p += p->versionlen; p->hostnamelen = htons(p->hostnamelen); p->versionlen = htons(p->versionlen); msend(a); } static void mrecv_req_exit(mdata *data, struct sockaddr_in *addr) { mhost *t = member_get(&(addr->sin_addr)); if(!t){ return; } member_del_message(0, t, "member exit"); member_del(t); } static void mrecv_req_send_break(mfile *m, mdata *r) { msend(mkack(r, &(m->addr), MAKUO_RECVSTATE_IGNORE)); mrecv_mfdel(m); } static void mrecv_req_send_stat(mfile *m, mdata *r) { struct stat fs; if(moption.dontrecv){ m->mdata.head.nstate = MAKUO_RECVSTATE_READONLY; }else{ if(r->head.flags & MAKUO_FLAG_SYNC){ if(m->mdata.head.nstate == MAKUO_RECVSTATE_NONE){ m->mdata.head.nstate = MAKUO_RECVSTATE_DELETEOK; if(r->head.flags & MAKUO_FLAG_DRYRUN){ if(lstat(m->fn, &fs) == -1 && errno == ENOENT){ m->mdata.head.nstate = MAKUO_RECVSTATE_DELETENG; } }else{ if(mremove(NULL, m->fn) == -1){ m->mdata.head.nstate = MAKUO_RECVSTATE_DELETENG; } } } }else{ if(S_ISLNK(m->fs.st_mode)){ m->mdata.head.nstate = linkcmp(m); }else{ if(lstat(m->fn, &fs) == -1){ m->mdata.head.nstate = MAKUO_RECVSTATE_UPDATE; }else{ m->mdata.head.nstate = statcmp(&(m->fs), &fs); } } } } msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); } static void mrecv_req_send_open(mfile *m, mdata *r) { char fpath[PATH_MAX]; char tpath[PATH_MAX]; if(m->mdata.head.nstate != MAKUO_RECVSTATE_UPDATE){ if(m->mdata.head.ostate == MAKUO_RECVSTATE_UPDATE){ msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); } return; } mtempname(moption.base_dir, m->fn, m->tn); sprintf(fpath, "%s/%s", moption.base_dir, m->fn); sprintf(tpath, "%s/%s", moption.base_dir, m->tn); m->mdata.head.ostate = m->mdata.head.nstate; m->mdata.head.nstate = MAKUO_RECVSTATE_OPENERROR; if(S_ISLNK(m->fs.st_mode)){ if(!mcreatelink(moption.base_dir, m->tn, m->ln)){ m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; set_filestat(tpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode); }else{ lprintf(0, "%s: symlink error %s -> %s\n", __func__, m->ln, m->fn); m->mdata.head.error = errno; } }else{ if(S_ISDIR(m->fs.st_mode)){ if(!is_dir(fpath)){ mcreatedir(moption.base_dir, m->fn, m->fs.st_mode); mkdir(fpath, m->fs.st_mode); } if(is_dir(fpath)){ m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; set_filestat(fpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode); }else{ lprintf(0,"%s: mkdir error %s\n", __func__, m->fn); m->mdata.head.error = errno; } } if(S_ISREG(m->fs.st_mode)){ m->fd = mcreatefile(moption.base_dir, m->tn, m->fs.st_mode); if(m->fd != -1){ m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; }else{ lprintf(0, "%s: %s %s\n", __func__, strerror(errno), m->fn); m->mdata.head.error = errno; } } if(S_ISCHR(m->fs.st_mode)){ if(!mcreatenode(moption.base_dir, m->tn, m->fs.st_mode, m->fs.st_rdev)){ m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; set_filestat(tpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode); }else{ lprintf(0, "%s: %s %s\n", __func__, strerror(errno), m->fn); m->mdata.head.error = errno; } } if(S_ISBLK(m->fs.st_mode)){ if(!mcreatenode(moption.base_dir, m->tn, m->fs.st_mode, m->fs.st_rdev)){ m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; set_filestat(tpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode); }else{ lprintf(0, "%s: %s %s\n", __func__, strerror(errno), m->fn); m->mdata.head.error = errno; } } if(S_ISFIFO(m->fs.st_mode)){ if(!mcreatenode(moption.base_dir, m->tn, m->fs.st_mode, m->fs.st_rdev)){ m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; set_filestat(tpath, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode); }else{ lprintf(0, "%s: %s %s\n", __func__, strerror(errno), m->fn); m->mdata.head.error = errno; } } } msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); } static void mrecv_req_send_mark(mfile *m, mdata *r) { mmark *mm; mfile *a; if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){ return; } if(m->mdata.head.seqno < m->seqnomax){ seq_addmark(m, m->mdata.head.seqno, m->seqnomax); m->mdata.head.seqno = m->seqnomax; } m->lickflag = 1; a = mkack(&(m->mdata),&(m->addr),MAKUO_RECVSTATE_MARK); if(!a){ lprintf(0, "[error] %s: out of momory\n", __func__); return; } if(a->mdata.head.szdata){ msend(a); return; } for(mm=m->mark;mm;mm=mm->next){ if(data_safeset32(&(a->mdata), mm->l)){ break; } if(data_safeset32(&(a->mdata), mm->h)){ a->mdata.head.szdata -= sizeof(uint32_t); break; } } msend(a); } static void mrecv_req_send_data_write(mfile *m, mdata *r) { off_t offset; if(r->head.szdata == 0){ return; } offset = r->head.seqno; offset *= MAKUO_BUFFER_SIZE; if(lseek(m->fd, offset, SEEK_SET) == -1){ m->mdata.head.error = errno; m->mdata.head.ostate = m->mdata.head.nstate; m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "[error] %s: seek error (%s) seq=%u\n", __func__, strerror(m->mdata.head.error), (int)(r->head.seqno)); return; /* lseek error */ } if(write(m->fd, r->data, r->head.szdata) == -1){ m->mdata.head.error = errno; m->mdata.head.ostate = m->mdata.head.nstate; m->mdata.head.nstate = MAKUO_RECVSTATE_WRITEERROR; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "[error] %s: write error (%s) seqno=%d size=%d %s\n", __func__, strerror(m->mdata.head.error), (int)(r->head.seqno), r->head.szdata, m->fn); return; /* write error */ } m->recvcount++; } static void mrecv_req_send_data_retry(mfile *m, mdata *r) { mmark *mm; mfile *a = mkack(&(m->mdata), &(m->addr), MAKUO_RECVSTATE_OPEN); if(!a){ lprintf(0, "%s: out of momory\n", __func__); return; } if(a->mdata.head.szdata == 0){ data_safeset32(&(a->mdata), m->mdata.head.seqno); data_safeset32(&(a->mdata), r->head.seqno); for(mm=m->mark;mm;mm=mm->next){ if(data_safeset32(&(a->mdata), mm->l)){ break; } if(data_safeset32(&(a->mdata), mm->h)){ a->mdata.head.szdata -= sizeof(uint32_t); break; } } } msend(a); } static void mrecv_req_send_data(mfile *m, mdata *r) { if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){ return; } if(m->lickflag){ if(seq_delmark(m, r->head.seqno)){ mrecv_req_send_data_write(m, r); } return; } if(m->mdata.head.seqno > r->head.seqno){ if(seq_delmark(m, r->head.seqno)){ mrecv_req_send_data_write(m, r); } return; } if(m->mdata.head.seqno < r->head.seqno){ mrecv_req_send_data_retry(m, r); seq_addmark(m, m->mdata.head.seqno, r->head.seqno); m->mdata.head.seqno = r->head.seqno; } mrecv_req_send_data_write(m, r); m->mdata.head.seqno++; } static void mrecv_req_send_close(mfile *m, mdata *r) { struct stat fs; struct utimbuf mftime; char path[PATH_MAX]; if(m->mdata.head.nstate == MAKUO_RECVSTATE_CLOSE || m->mdata.head.nstate == MAKUO_RECVSTATE_CLOSEERROR){ msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); return; } m->mdata.head.ostate = m->mdata.head.nstate; if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); return; } mftime.actime = m->fs.st_ctime; mftime.modtime = m->fs.st_mtime; if(S_ISLNK(m->fs.st_mode)){ if(!mrename(moption.base_dir, m->tn, m->fn)){ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); }else{ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "[error] %s: close error %s -> %s\n", __func__, m->ln, m->fn); mremove(moption.base_dir, m->tn); } return; } if(S_ISDIR(m->fs.st_mode)){ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); sprintf(path, "%s/%s", moption.base_dir, m->fn); utime(path, &mftime); return; } if(!S_ISREG(m->fs.st_mode)){ if(!mrename(moption.base_dir, m->tn, m->fn)){ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); sprintf(path, "%s/%s", moption.base_dir, m->tn); utime(path, &mftime); }else{ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "[error] %s: close error %s (can't rename)\n", __func__, m->fn); mremove(moption.base_dir, m->tn); } return; } if(m->fd == -1){ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "[error] %s: bat discriptor close error %s\n", __func__, m->fn); mremove(moption.base_dir, m->tn); return; } if(fstat(m->fd, &fs) == -1){ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR; m->mdata.head.error = errno; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "[error] %s: %s fstat error %s\n", __func__, strerror(errno), m->fn); return; } if(close(m->fd) == -1){ m->fd = -1; m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR; m->mdata.head.error = errno; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "[error] %s: %s close error %s\n", __func__, strerror(errno), m->fn); return; } m->fd = -1; if(fs.st_size != m->fs.st_size){ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "[error] %s: close error %s (file size mismatch %d != %d)\n", __func__, m->fn, (int)(fs.st_size), (int)(m->fs.st_size)); lprintf(0, "[error] %s: seq=%d max=%d mark=%d recv=%d\n", __func__, m->mdata.head.seqno, m->seqnomax, m->markcount, m->recvcount); mremove(moption.base_dir, m->tn); return; } if(!mrename(moption.base_dir, m->tn, m->fn)){ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); sprintf(path, "%s/%s", moption.base_dir, m->fn); set_filestat(path, m->fs.st_uid, m->fs.st_gid, m->fs.st_mode); utime(path, &mftime); }else{ m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSEERROR; msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); lprintf(0, "%s: close error %s (can't rename)\n", __func__, m->fn); mremove(moption.base_dir, m->tn); } } static void mrecv_req_send_last(mfile *m, mdata *r) { msend(mkack(r, &(m->addr), MAKUO_RECVSTATE_LAST)); mrecv_mfdel(m); } static void mrecv_req_send_next(mfile *m, mdata *r) { switch(r->head.nstate){ case MAKUO_SENDSTATE_STAT: mrecv_req_send_stat(m, r); break; case MAKUO_SENDSTATE_OPEN: mrecv_req_send_open(m, r); break; case MAKUO_SENDSTATE_DATA: mrecv_req_send_data(m, r); break; case MAKUO_SENDSTATE_MARK: mrecv_req_send_mark(m, r); break; case MAKUO_SENDSTATE_CLOSE: mrecv_req_send_close(m, r); break; case MAKUO_SENDSTATE_LAST: mrecv_req_send_last(m, r); break; case MAKUO_SENDSTATE_BREAK: mrecv_req_send_break(m, r); break; } } static mfile *mrecv_req_send_create(mdata *data, struct sockaddr_in *addr) { mstat fs; mfile *m; uint16_t fnlen; uint16_t lnlen; uint32_t ldev; uint32_t hdev; uint64_t rdev; if((m = mrecv_req_search(data, addr))){ return(m); } if(data->head.nstate != MAKUO_SENDSTATE_STAT){ return(NULL); } /* create object */ if(!(m = mfadd(MFRECV))){ lprintf(0, "[error] %s: out of momory\n", __func__); return(NULL); } /* copy header and addr */ memcpy(&(m->addr), addr, sizeof(m->addr)); memcpy(&(m->mdata.head), &(data->head), sizeof(m->mdata.head)); /* read mstat */ data->p = data->data; data_safeget(data, &fs, sizeof(fs)); /* stat = mstat */ m->fs.st_mode = ntohl(fs.mode); m->fs.st_uid = ntohs(fs.uid); m->fs.st_gid = ntohs(fs.gid); m->fs.st_size = ((off_t)ntohl(fs.sizeh) << 32) + (off_t)ntohl(fs.sizel); m->fs.st_mtime = ntohl(fs.mtime); m->fs.st_ctime = ntohl(fs.ctime); fnlen = ntohs(fs.fnlen); lnlen = ntohs(fs.lnlen); /* read filename */ data_safeget(data, m->fn, fnlen); m->fn[fnlen] = 0; /* read linkname */ data_safeget(data, m->ln, lnlen); m->ln[lnlen] = 0; /* rdev */ data_safeget32(data, &hdev); data_safeget32(data, &ldev); rdev = hdev; rdev <<= 32; rdev |= ldev; m->fs.st_rdev = (dev_t)rdev; /* Number of blocks */ m->seqnomax = m->fs.st_size / MAKUO_BUFFER_SIZE; if(m->fs.st_size % MAKUO_BUFFER_SIZE){ m->seqnomax++; } return(m); } static void mrecv_req_send(mdata *data, struct sockaddr_in *addr) { mfile *m; if((m = mrecv_req_send_create(data, addr))){ mtimeget(&(m->lastrecv)); mrecv_req_send_next(m, data); }else{ if(data->head.nstate != MAKUO_SENDSTATE_DATA){ msend(mkack(data, addr, MAKUO_RECVSTATE_IGNORE)); } } } static void mrecv_req_md5_open(mfile *m, mdata *data, struct sockaddr_in *addr) { int l; mhash *h; if(!m){ m = mfadd(MFRECV); memcpy(&(m->addr), addr, sizeof(m->addr)); memcpy(&(m->mdata.head), &(data->head), sizeof(m->mdata.head)); h = (mhash *)(data->data); l = ntohs(h->fnlen); memcpy(m->fn, h->filename, l); m->fn[l] = 0; memcpy(m->mdata.data, h->hash, 16); m->fd = open(m->fn, O_RDONLY); if(m->fd == -1){ m->mdata.head.error = errno; m->mdata.head.nstate = MAKUO_RECVSTATE_OPENERROR; }else{ m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; m->link = mkack(&(m->mdata), &(m->addr), MAKUO_RECVSTATE_NONE); m->link->link = m; MD5_Init(&(m->md5)); } } mtimeget(&(m->lastrecv)); msend(mkack(&(m->mdata), &(m->addr), m->mdata.head.nstate)); } static void mrecv_req_md5_close(mfile *m, mdata *data, struct sockaddr_in *addr) { if(m){ if(m->link){ close(m->fd); m->fd = -1; MD5_Final(m->mdata.data, &(m->md5)); m->link->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE; m->link->link = NULL; m->link = NULL; } mrecv_mfdel(m); } msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE)); } static void mrecv_req_md5(mdata *data, struct sockaddr_in *addr) { mfile *m = mrecv_req_search(data, addr); switch(data->head.nstate){ case MAKUO_SENDSTATE_OPEN: mrecv_req_md5_open(m, data, addr); break; case MAKUO_SENDSTATE_CLOSE: mrecv_req_md5_close(m, data, addr); break; } } static int dsync_write(int fd, char *base, uint8_t sta, uint16_t len, uint32_t mod) { int r; size_t s; fd_set wfds; char buff[PATH_MAX + sizeof(sta) + sizeof(mod) + sizeof(len)]; char *p = buff; if(!loop_flag){ return(1); } if(!strcmp(base, ".")){ return(0); } while(len >= 2){ if(memcmp(base, "./", 2)){ break; }else{ base += 2; len -= 2; if(!len){ return(0); } } } while(*base == '/'){ base++; len--; if(!len){ return(0); } } *(uint16_t *)p = len + sizeof(mod); p += sizeof(len); *(uint32_t *)p = mod; p += sizeof(mod); memcpy(p, base, len); p = buff; s = sizeof(len) + sizeof(mod) + len; while(s){ FD_ZERO(&wfds); FD_SET(fd,&wfds); if(select(1024, NULL, &wfds, NULL, NULL) < 0){ if(!loop_flag){ return(1); } continue; } if(FD_ISSET(fd,&wfds)){ r = write(fd, p, s); if(r == -1){ if(errno == EINTR){ continue; } lprintf(0, "[error] %s: write error %s\n", __func__, base); return(-1); }else{ s -= r; p += r; } } } return(0); } static int dsync_scan(int fd, char *base, int recurs, excludeitem *e) { int r; DIR *d; uint16_t len; struct stat st; struct dirent *dent; char path[PATH_MAX]; if(!loop_flag){ return(1); } /*----- read only -----*/ if(moption.dontrecv){ return(0); } /*----- exclude -----*/ if(isexclude(base, e, 0)){ return(0); } len = strlen(base); if(lstat(base, &st) == -1){ return(dsync_write(fd, base, MAKUO_SENDSTATE_ERROR, len, st.st_mode)); } if(S_ISLNK(st.st_mode)){ return(dsync_write(fd, base, MAKUO_SENDSTATE_STAT, len, st.st_mode)); } if(!S_ISDIR(st.st_mode)){ return(dsync_write(fd, base, MAKUO_SENDSTATE_STAT, len, st.st_mode)); } /*----- exclude dir -----*/ if(isexclude(base, e, 1)){ return(0); } /*----- dir scan -----*/ if((d = opendir(base))){ while((dent=readdir(d))){ if(!loop_flag){ break; } if(!strcmp(dent->d_name, ".")){ continue; } if(!strcmp(dent->d_name, "..")){ continue; } sprintf(path, "%s/%s", base, dent->d_name); if(recurs){ if((r = dsync_scan(fd, path, recurs, e))){ closedir(d); return(r); } }else{ len = strlen(path); if((r = dsync_write(fd, path, MAKUO_SENDSTATE_STAT, len, st.st_mode))){ closedir(d); return(r); } } } closedir(d); len = strlen(base); return(dsync_write(fd, base, MAKUO_SENDSTATE_STAT, len, st.st_mode)); } return(dsync_write(fd, base, MAKUO_SENDSTATE_ERROR, len, st.st_mode)); } static void mrecv_req_dsync_open(mfile *m, mdata *data, struct sockaddr_in *addr) { msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN)); if(m){ return; } m = mfadd(MFRECV); m->mdata.head.opcode = data->head.opcode; m->mdata.head.reqid = data->head.reqid; m->mdata.head.flags = data->head.flags; m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; memcpy(&(m->addr), addr, sizeof(m->addr)); if(data->head.szdata){ memcpy(m->fn, data->data, data->head.szdata); } m->fn[data->head.szdata] = 0; mtimeget(&(m->lastrecv)); } static void mrecv_req_dsync_data(mfile *m, mdata *data, struct sockaddr_in *addr) { int pid; int p[2]; mfile *d; char path[PATH_MAX]; uint16_t len; msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN)); if(!m){ return; } mtimeget(&(m->lastrecv)); if(m->mdata.head.seqno >= data->head.seqno){ return; } if(data->head.szdata){ data->p = data->data; while(!data_safeget16(data, &len)){ data_safeget(data, path, len); path[len] = 0; m->exclude = exclude_add(m->exclude, path); m->mdata.head.seqno++; } return; } d = mfins(MFSEND); d->link = m; m->link = d; d->initstate = 1; d->sendwait = 0; d->exclude = m->exclude; m->exclude = NULL; d->mdata.head.opcode = MAKUO_OP_DEL; d->mdata.head.reqid = getrid(); d->mdata.head.flags = data->head.flags; d->mdata.head.seqno = data->head.reqid; d->mdata.head.nstate = MAKUO_SENDSTATE_STAT; memcpy(&(d->addr), addr, sizeof(d->addr)); strcpy(d->fn, m->fn); d->sendto = 1; if(d->mdata.head.flags & MAKUO_FLAG_RECURS){ d->recurs = 1; } if(d->mdata.head.flags & MAKUO_FLAG_DRYRUN){ d->dryrun = 1; } /* fork */ pipe(p); pid = fork(); if(pid == -1){ lprintf(0, "%s: %s fork error\n", __func__, strerror(errno)); close(p[0]); close(p[1]); return; } if(pid){ /* parent */ d->pid = pid; d->pipe = p[0]; close(p[1]); /* write close */ while((d->exclude = exclude_del(d->exclude))); }else{ /* child */ close(p[0]); /* read close */ dsync_scan(p[1], d->fn, d->recurs, d->exclude); close(p[1]); _exit(0); } } static void mrecv_req_dsync_close(mfile *m, mdata *data, struct sockaddr_in *addr) { if(!m){ msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE)); }else{ mtimeget(&(m->lastrecv)); if(m->link){ msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN)); }else{ msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE)); mrecv_mfdel(m); } } } static void mrecv_req_dsync_break(mfile *m, mdata *data, struct sockaddr_in *addr) { msend(mkack(data, addr, MAKUO_RECVSTATE_BREAK)); if(m){ if(m->link){ m->link->mdata.head.nstate = MAKUO_SENDSTATE_BREAK; m->link->sendwait = 0; } mrecv_mfdel(m); } } /* * dsync */ static void mrecv_req_dsync(mdata *data, struct sockaddr_in *addr) { mfile *m = mrecv_req_search(data, addr); switch(data->head.nstate){ case MAKUO_SENDSTATE_OPEN: mrecv_req_dsync_open(m, data, addr); break; case MAKUO_SENDSTATE_DATA: mrecv_req_dsync_data(m, data, addr); break; case MAKUO_SENDSTATE_CLOSE: mrecv_req_dsync_close(m, data, addr); break; case MAKUO_SENDSTATE_BREAK: mrecv_req_dsync_break(m, data, addr); break; } } /* * del */ static void mrecv_req_del_open(mdata *data, struct sockaddr_in *addr) { uint16_t len; uint32_t mod; mfile *a = mkack(data, addr, MAKUO_RECVSTATE_OPEN); mfile *m = mrecv_req_search(data, addr); mhost *t = member_get(&(addr->sin_addr)); char path[PATH_MAX]; if(!a){ lprintf(0, "[error] %s: ack can't create\n", __func__); return; } data->p = data->data; while(!data_safeget16(data, &len)){ data_safeget32(data, &mod); len -= sizeof(uint32_t); data_safeget(data, path, len); path[len] = 0; #ifdef MAKUO_DEBUG lprintf(9, "%s: %s", __func__, path); #endif if(lstat(path, &(a->fs)) == -1 && errno == ENOENT){ #ifdef MAKUO_DEBUG lprintf(9, " [DELETE]"); #endif data_safeset16(&(a->mdata), len + sizeof(uint32_t)); data_safeset32(&(a->mdata), 0); data_safeset(&(a->mdata), path, len); } #ifdef MAKUO_DEBUG lprintf(9, "\n"); #endif } msend(a); if(m){ return; } m = mfadd(MFRECV); m->mdata.head.opcode = data->head.opcode; m->mdata.head.reqid = data->head.reqid; m->mdata.head.nstate = MAKUO_RECVSTATE_OPEN; memcpy(&(m->addr), addr, sizeof(m->addr)); mtimeget(&(m->lastrecv)); if(data->head.flags & MAKUO_FLAG_DRYRUN){ m->dryrun = 1; } } static void mrecv_req_del_data_report(mfile *m, mcomm *c, uint32_t err, char *hn, char *path) { char *dryrun = ""; if(m->dryrun){ dryrun = "(dryrun) "; } if(err){ cprintf(0, c, "delete error (%s) %s:%s\n", strerror(err), hn, path); lprintf(1, "delete error (%s) %s:%s\n", strerror(err), hn, path); }else{ cprintf(0, c, "%sdelete %s:%s\n", dryrun, hn, path); lprintf(1, "%sdelete %s:%s\n", dryrun, hn, path); } } static void mrecv_req_del_data(mdata *data, struct sockaddr_in *addr) { uint32_t err; uint16_t len; char *hn = "unknown host"; mhost *t = member_get(&(addr->sin_addr)); mcomm *c = NULL; mfile *a = NULL; mfile *m = mrecv_req_search(data, addr); char path[PATH_MAX]; msend(mkack(data, addr, MAKUO_RECVSTATE_OPEN)); if(!m){ return; } if(m->mdata.head.nstate != MAKUO_RECVSTATE_OPEN){ return; } if(t){ hn = t->hostname; } for(a=mftop[MFSEND];a;a=a->next){ if((a->mdata.head.reqid == data->head.seqno) && (a->comm != NULL)){ c = a->comm; break; } } data->p = data->data; while(!data_safeget16(data, &len)){ data_safeget32(data, &err); len -= sizeof(uint32_t); data_safeget(data, path, len); path[len] = 0; mrecv_req_del_data_report(m, c, err, hn, path); } m->mdata.head.ostate = m->mdata.head.nstate; m->mdata.head.nstate = MAKUO_RECVSTATE_CLOSE; } static void mrecv_req_del_close(mdata *data, struct sockaddr_in *addr) { mfile *m = mrecv_req_search(data, addr); msend(mkack(data, addr, MAKUO_RECVSTATE_CLOSE)); mrecv_mfdel(m); } static void mrecv_req_del(mdata *data, struct sockaddr_in *addr) { switch(data->head.nstate){ case MAKUO_SENDSTATE_OPEN: mrecv_req_del_open(data, addr); break; case MAKUO_SENDSTATE_DATA: mrecv_req_del_data(data, addr); break; case MAKUO_SENDSTATE_CLOSE: mrecv_req_del_close(data, addr); break; } } static void mrecv_req(mdata *data, struct sockaddr_in *addr) { switch(data->head.opcode){ case MAKUO_OP_PING: mrecv_req_ping(data, addr); break; case MAKUO_OP_EXIT: mrecv_req_exit(data, addr); break; case MAKUO_OP_SEND: mrecv_req_send(data, addr); break; case MAKUO_OP_MD5: mrecv_req_md5(data, addr); break; case MAKUO_OP_DSYNC: mrecv_req_dsync(data, addr); break; case MAKUO_OP_DEL: mrecv_req_del(data, addr); break; default: msend(mkack(data, addr, MAKUO_RECVSTATE_IGNORE)); break; } } /****************************************************************** * * Receive common functions (public) * *******************************************************************/ void mrecv_gc() { mhost *t = members; mfile *m = mftop[MFRECV]; /* file timeout */ while(m){ if(mtimeout(&(m->lastrecv), MAKUO_RECV_GCWAIT)){ lprintf(0,"%s: mfile object GC state=%s %s\n", __func__, strrstate(m->mdata.head.nstate), m->fn); m = mrecv_mfdel(m); continue; } m = m->next; } /* pong timeout */ while(t){ if(!mtimeout(&(t->lastrecv), MAKUO_PONG_TIMEOUT)){ t = t->next; }else{ lprintf(0,"%s: pong timeout %s\n", __func__, t->hostname); member_del_message(1, t, "pong time out"); if(t->next){ t = t->next; member_del(t->prev); }else{ member_del(t); t = NULL; } } } } void mrecv_clean() { mfile *m = mftop[MFRECV]; while((m = mrecv_mfdel(m))); } int mrecv() { mfile *m; mhost *t; mdata data; struct sockaddr_in addr; m = mftop[MFSEND]; if(mrecv_packet(moption.mcsocket, &data, &addr) == -1){ return(0); } if((t = member_get(&addr.sin_addr))){ mtimeget(&(t->lastrecv)); } if(data.head.flags & MAKUO_FLAG_ACK){ mrecv_ack(&data, &addr); }else{ mrecv_req(&data, &addr); } return(m == mftop[MFSEND]); }