/*
 * msend.c
 * Copyright (C) 2008 KLab Inc. 
 */
#include "makuosan.h"

static void msend_ack(int s, mfile *m);
static void msend_req(int s, mfile *m);

/******************************************************************
*
* send common functions (private)
*
*******************************************************************/
static mfile *msend_mfdel(mfile *m)
{
  mfile *r;
  if(!m){
    return(NULL);
  }
  r = m->next;
  if(m->fd != -1){
    close(m->fd);
  }
  if(m->pipe != -1){
    close(m->pipe);
  }
  if(m->pid){
    kill(m->pid, SIGTERM);
    waitpid(m->pid, NULL, 0);
  }
  if(m->mark){
    free(m->mark);
  }
  clr_hoststate(m);
  mfdel(m);
  return(r);
}

static int msend_encrypt(mdata *data)
{
  int  szdata;
  MD5_CTX ctx;

  szdata = data->head.szdata;
  if(moption.cryptena){
    data->head.flags |= MAKUO_FLAG_CRYPT;
    if(data->head.szdata){
      MD5_Init(&ctx);
      MD5_Update(&ctx, data->data, data->head.szdata);
      MD5_Final(data->head.hash, &ctx);
      for(szdata=0;szdata<data->head.szdata;szdata+=8){
        BF_encrypt((BF_LONG *)(data->data + szdata), &EncKey);
      }
    }
  }
  return(szdata);
}

static int msend_packet(int s, mdata *data, struct sockaddr_in *addr)
{
  int r;
  int szdata;
  mdata senddata;

  memcpy(&senddata, data, sizeof(senddata));
  szdata = msend_encrypt(&senddata);

  senddata.head.szdata = htons(senddata.head.szdata);
  senddata.head.flags  = htons(senddata.head.flags);
  senddata.head.reqid  = htonl(senddata.head.reqid);
  senddata.head.seqno  = htonl(senddata.head.seqno);
 
  while(1){ 
    r = sendto(s, &senddata, sizeof(mhead) + szdata, 0, (struct sockaddr*)addr, sizeof(struct sockaddr_in));
    if(r == sizeof(mhead) + szdata){
      break;
    }else{
      if(r != -1){
        lprintf(0,"%s: size error sock=%d op=%d rid=%d state=%s size=%d send=%d seqno=%d\n", __func__,
          s, data->head.opcode, data->head.reqid, SSTATE(data->head.nstate), sizeof(mhead) + szdata, r, data->head.seqno);
        return(0);
      }else{
        if(errno == EINTR){
          continue;
        }else{
          lprintf(0,"%s: send error (%s) sock=%d op=%d rid=%d state=%s size=%d seqno=%d\n", __func__,
            strerror(errno), s, data->head.opcode, data->head.reqid, SSTATE(data->head.nstate), sizeof(mhead) + szdata, data->head.seqno);
          return(-1);
        }
      }
    }
  }
  return(1);
}

/* retry */
static void msend_retry(mfile *m)
{
  uint8_t *r;
  mhost   *t;

  if(!m->sendwait){
    m->retrycnt = MAKUO_SEND_RETRYCNT;
    return;
  }
  lprintf(2, "%s: send retry count=%02d rid=%06d op=%s state=%s %s\n", 
    __func__,
    m->retrycnt, 
    m->mdata.head.reqid, 
    OPCODE(m->mdata.head.opcode), 
    SSTATE(m->mdata.head.nstate), 
    m->fn);
  for(t=members;t;t=t->next){
    r = get_hoststate(t, m);
    if(!r){
      lprintf(0, "%s: can't alloc state area %s\n",
        __func__, 
        t->hostname);
      continue;
    }
    switch(moption.loglevel){
      case 3:
        if(*r == MAKUO_RECVSTATE_NONE){
          lprintf(0, "%s:   %s %s(%s)\n", 
            __func__, 
           RSTATE(*r), 
           inet_ntoa(t->ad), 
           t->hostname);
        }
        break;
      default:
        lprintf(4, "%s:   %s %s(%s)\n", 
          __func__, 
          RSTATE(*r), 
          inet_ntoa(t->ad), 
          t->hostname);
        break;
    }
  }
  if(m->mdata.head.opcode == MAKUO_OP_DSYNC){
    if(m->mdata.head.nstate == MAKUO_SENDSTATE_CLOSE){
      return;
    }
  }
  m->retrycnt--;
}

/* send & free */
static void msend_shot(int s, mfile *m)
{
  msend_packet(s, &(m->mdata), &(m->addr));
  msend_mfdel(m);
}

/******************************************************************
*
* send common functions (public)
*
*******************************************************************/
void msend(int s, mfile *m)
{
  if(!m){
    return;
  }
  msend_retry(m);
  mtimeget(&m->lastsend);
  if(m->mdata.head.flags & MAKUO_FLAG_ACK){
    msend_ack(s, m); /* source node task */
  }else{
    msend_req(s, m); /* destination node task */
  }
}

/******************************************************************
*
* ack send functions (for destination node tasks)
*
*******************************************************************/
static void msend_ack_ping(int s, mfile *m)
{
  msend_shot(s, m);
}

static void msend_ack_send(int s, mfile *m)
{
  if(m->markcount){
    m->mdata.head.szdata = m->marksize * sizeof(uint32_t);
    memcpy(m->mdata.data, m->mark, m->marksize * sizeof(uint32_t));
  }
  msend_shot(s, m);
}

static void msend_ack_md5(int s, mfile *m)
{
  msend_shot(s, m);
}

static void msend_ack_dsync(int s, mfile *m)
{
  msend_shot(s, m);
}

static void msend_ack_del(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  msend_shot(s, m);
}

static void msend_ack(int s, mfile *m)
{
  switch(m->mdata.head.opcode){
    case MAKUO_OP_PING:
      msend_ack_ping(s, m);
      break;
    case MAKUO_OP_EXIT:
      break;
    case MAKUO_OP_SEND:
      msend_ack_send(s, m);
      break;
    case MAKUO_OP_MD5:
      msend_ack_md5(s, m);
      break;
    case MAKUO_OP_DSYNC:
      msend_ack_dsync(s, m);
      break;
    case MAKUO_OP_DEL:
      msend_ack_del(s, m);
      break;
    /* æ©è½è¿½å ã¯ãã */
  }
}

/******************************************************************
*
* req send functions (for source node tasks)
*
*******************************************************************/
static void msend_req_send_break(int s, mfile *m)
{
  lprintf(9, "%s: BREAK %s\n", __func__, m->fn);
  msend_packet(s, &(m->mdata), &(m->addr));
  msend_mfdel(m);
}

static void msend_req_send_stat_init(int s, mfile *m)
{
  mstat    fs;
  uint64_t rdev;
  if(!m->comm){
    msend_mfdel(m);
    m = NULL;
    return;
  }
  m->mdata.p = m->mdata.data;
  m->mdata.head.szdata  = sizeof(fs);
  m->mdata.head.szdata += strlen(m->fn);
  m->mdata.head.szdata += strlen(m->ln);
  m->mdata.head.szdata += sizeof(uint64_t);
  if(m->mdata.head.szdata > MAKUO_BUFFER_SIZE){
    lprintf(0, "%s: buffer size over size=%d file=%s\n", __func__, m->mdata.head.szdata, m->fn);
    cprintf(0, m->comm, "error: buffer size over size=%d file=%s\n", m->mdata.head.szdata, m->fn);
    return;
  }
  fs.mode  = htonl(m->fs.st_mode);
  fs.uid   = htons(m->fs.st_uid);
  fs.gid   = htons(m->fs.st_gid);
  fs.sizel = htonl((uint32_t)(m->fs.st_size & 0xFFFFFFFF));
  fs.sizeh = htonl((uint32_t)(m->fs.st_size >> 32));
  fs.mtime = htonl(m->fs.st_mtime);
  fs.ctime = htonl(m->fs.st_ctime);
  fs.fnlen = htons(strlen(m->fn));
  fs.lnlen = htons(strlen(m->ln));
  memcpy(m->mdata.p, &fs, sizeof(fs));
  m->mdata.p += sizeof(fs);
  strcpy(m->mdata.p, m->fn);
  m->mdata.p += strlen(m->fn);
  strcpy(m->mdata.p, m->ln);
  m->mdata.p += strlen(m->ln);

  rdev = (uint64_t)(m->fs.st_rdev);
  *(uint32_t *)(m->mdata.p) = htonl((uint32_t)(rdev >> 32));
  m->mdata.p += sizeof(uint32_t);
  *(uint32_t *)(m->mdata.p) = htonl((uint32_t)(rdev & 0xFFFFFFFF));
  m->mdata.p += sizeof(uint32_t);
  m->sendwait  = 1;
  m->initstate = 0;
  ack_clear(m, -1);
  msend_packet(s, &(m->mdata), &(m->addr));
}

static void msend_req_send_stat(int s, mfile *m)
{
  uint8_t *r;
  mhost   *t;

  if(m->initstate){
    msend_req_send_stat_init(s, m);
    return;
  }
  if(m->sendwait){
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  lprintf(9,"%s: STAT %s\n", __func__, m->fn);
  if(m->dryrun){
    if(ack_check(m, MAKUO_RECVSTATE_UPDATE) != 1){
      if(!m->sendto){
        for(t=members;t;t=t->next){
          if(r = get_hoststate(t, m)){
            if(*r == MAKUO_RECVSTATE_UPDATE)
              cprintf(3, m->comm, "(dryrun) update %s:%s\r\n", t->hostname, m->fn);
            if(*r == MAKUO_RECVSTATE_SKIP)
              cprintf(3, m->comm, "(dryrun) skip   %s:%s\r\n", t->hostname, m->fn);
            if(*r == MAKUO_RECVSTATE_READONLY)
              cprintf(3, m->comm, "(dryrun) skipro %s:%s\r\n", t->hostname, m->fn);
          }
        }
      }else{
        t = member_get(&(m->addr.sin_addr));
        if(r = get_hoststate(t, m)){
          if(*r == MAKUO_RECVSTATE_UPDATE)
            cprintf(3, m->comm, "(dryrun) update %s:%s\r\n", t->hostname, m->fn);
          if(*r == MAKUO_RECVSTATE_SKIP)
            cprintf(3, m->comm, "(dryrun) skip   %s:%s\r\n", t->hostname, m->fn);
          if(*r == MAKUO_RECVSTATE_READONLY)
            cprintf(3, m->comm, "(dryrun) skipro %s:%s\r\n", t->hostname, m->fn);
        }
      }
    }else{
      if(m->comm){
        if(m->comm->loglevel == 0){
          cprintf(0, m->comm, "[%s]\r\n", m->fn);
        }else{
          if(!m->sendto){
            for(t=members;t;t=t->next){
              r = get_hoststate(t, m);
              if(*r == MAKUO_RECVSTATE_UPDATE)
                cprintf(1, m->comm, "(dryrun) update %s:%s\r\n", t->hostname, m->fn);
              if(*r == MAKUO_RECVSTATE_SKIP)
                cprintf(2, m->comm, "(dryrun) skip   %s:%s\r\n", t->hostname, m->fn);
              if(*r == MAKUO_RECVSTATE_READONLY)
                cprintf(2, m->comm, "(dryrun) skipro %s:%s\r\n", t->hostname, m->fn);
            }
          }else{
            t = member_get(&(m->addr.sin_addr));
            r = get_hoststate(t, m);
            if(*r == MAKUO_RECVSTATE_UPDATE)
              cprintf(1, m->comm, "(dryrun) update %s:%s\r\n", t->hostname, m->fn);
            if(*r == MAKUO_RECVSTATE_SKIP)
              cprintf(2, m->comm, "(dryrun) skip   %s:%s\r\n", t->hostname, m->fn);
            if(*r == MAKUO_RECVSTATE_READONLY)
              cprintf(2, m->comm, "(dryrun) skipro %s:%s\r\n", t->hostname, m->fn);
          }
        }
      }
    }
    m->initstate = 1;
    m->mdata.head.ostate = m->mdata.head.nstate;
    m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
  }else{
    if(ack_check(m, MAKUO_RECVSTATE_UPDATE) != 1){
      if(!m->sendto){
        for(t=members;t;t=t->next){
          if(r = get_hoststate(t, m)){
            if(*r == MAKUO_RECVSTATE_UPDATE)
              cprintf(3, m->comm, "(dryrun) update %s:%s\r\n", t->hostname, m->fn);
            if(*r == MAKUO_RECVSTATE_SKIP)
              cprintf(3, m->comm, "(dryrun) skip   %s:%s\r\n", t->hostname, m->fn);
            if(*r == MAKUO_RECVSTATE_READONLY)
              cprintf(3, m->comm, "(dryrun) skipro %s:%s\r\n", t->hostname, m->fn);
          }
        }
      }else{
        t = member_get(&(m->addr.sin_addr));
        if(r = get_hoststate(t, m)){
          if(*r == MAKUO_RECVSTATE_UPDATE)
            cprintf(3, m->comm, "(dryrun) update %s:%s\r\n", t->hostname, m->fn);
          if(*r == MAKUO_RECVSTATE_SKIP)
            cprintf(3, m->comm, "(dryrun) skip   %s:%s\r\n", t->hostname, m->fn);
          if(*r == MAKUO_RECVSTATE_READONLY)
            cprintf(3, m->comm, "(dryrun) skipro %s:%s\r\n", t->hostname, m->fn);
        }
      }
      m->initstate = 1;
      m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
    }else{
      lprintf(1, "%s: update %s\n", __func__, m->fn);
      if(m->comm){
        if(m->comm->loglevel == 0){
          cprintf(0, m->comm, "[%s]\r\n", m->fn);
        }else{
          if(!m->sendto){
            for(t=members;t;t=t->next){
              if(r = get_hoststate(t, m)){
                if(*r == MAKUO_RECVSTATE_UPDATE)
                  cprintf(1, m->comm, "update %s:%s\r\n", t->hostname, m->fn);
                if(*r == MAKUO_RECVSTATE_SKIP)
                  cprintf(2, m->comm, "skip   %s:%s\r\n", t->hostname, m->fn);
                if(*r == MAKUO_RECVSTATE_READONLY)
                  cprintf(2, m->comm, "skipro %s:%s\r\n", t->hostname, m->fn);
              }
            }
          }else{
            t = member_get(&(m->addr.sin_addr));
            if(r = get_hoststate(t, m)){
              if(*r == MAKUO_RECVSTATE_UPDATE)
                cprintf(1, m->comm, "update %s:%s\r\n", t->hostname, m->fn);
              if(*r == MAKUO_RECVSTATE_SKIP)
                cprintf(2, m->comm, "skip   %s:%s\r\n", t->hostname, m->fn);
              if(*r == MAKUO_RECVSTATE_READONLY)
                cprintf(2, m->comm, "skipro %s:%s\r\n", t->hostname, m->fn);
            }
          }
        }
      }
      m->initstate = 1;
      m->mdata.head.ostate = m->mdata.head.nstate;
      m->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
    }
  }
}

static void msend_req_send_open_init(int s, mfile *m)
{
  m->sendwait  = 1;
  m->initstate = 0;
  ack_clear(m, MAKUO_RECVSTATE_UPDATE);

  /*----- symlink -----*/
  if(S_ISLNK(m->fs.st_mode) || !S_ISREG(m->fs.st_mode)){
    msend_packet(s, &(m->mdata), &(m->addr));
  }else{
    m->fd = open(m->fn, O_RDONLY, 0);
    if(m->fd != -1){
      msend_packet(s, &(m->mdata), &(m->addr));
    }else{
      m->sendwait  = 0;
      m->initstate = 1;
      m->mdata.head.ostate = m->mdata.head.nstate;
      m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
      cprintf(0, m->comm, "error: file open error errno=%d %s\n", errno, m->fn);
      lprintf(0, "%s: open error errno=%d %s\n", __func__, errno, m->fn);
    }
  }
}

static void msend_req_send_open(int s, mfile *m)
{
  if(m->initstate){
    msend_req_send_open_init(s, m);
    return;
  }
  if(m->sendwait){
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  lprintf(9, "%s: OPEN %s\n", __func__, m->fn);
  if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
    m->sendwait = 1;
    ack_clear(m, MAKUO_RECVSTATE_UPDATE);
    return;
  }
  if(S_ISLNK(m->fs.st_mode) || !S_ISREG(m->fs.st_mode)){
    m->initstate = 1;
    m->mdata.head.ostate = m->mdata.head.nstate;
    m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
  }else{
    m->mdata.head.seqno  = 0;
    m->mdata.head.ostate = m->mdata.head.nstate;
    m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
  }
}

static void msend_req_send_markdata(int s, mfile *m)
{
  int r;
  if(!m->markcount){
    /* close */
    m->initstate = 1;
    m->mdata.head.ostate = m->mdata.head.nstate;
    m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
    return;
  }
  m->markcount--;
  lprintf(4, "%s: block send retry seqno=%u count=%u\n", __func__, m->mark[m->markcount], m->markcount);
  m->mdata.head.seqno = m->mark[m->markcount];
  lseek(m->fd, m->mdata.head.seqno * MAKUO_BUFFER_SIZE, SEEK_SET);
  r = read(m->fd, m->mdata.data, MAKUO_BUFFER_SIZE);
  if(r>0){
    m->mdata.head.szdata = r;
    msend_packet(s, &(m->mdata), &(m->addr));
  }else{
    if(!r){
      lprintf(0, "%s: read eof? seqno=%d\n", __func__, m->mdata.head.seqno);
    }else{
      lprintf(0, "%s: read err! seqno=%d errno=%d\n", __func__, m->mdata.head.seqno, errno);
    }
  }
  if(m->markcount == 0){
    m->initstate = 1;
    m->mdata.head.nstate = MAKUO_SENDSTATE_MARK;
  }
}

static void msend_req_send_filedata(int s, mfile *m)
{
  int readsize;
  if(m->markcount){
    m->markcount--;
    m->mdata.head.seqno = m->mark[m->markcount];
  }else{
    if(m->seqnonow == 0){
      m->waitspan  = 1024;
      m->waitcount = m->waitspan;
    }
    m->mdata.head.seqno = m->seqnonow++;
  }
  lseek(m->fd, m->mdata.head.seqno * MAKUO_BUFFER_SIZE, SEEK_SET);
  readsize = read(m->fd, m->mdata.data, MAKUO_BUFFER_SIZE);
  if(readsize > 0){
    m->mdata.head.szdata = readsize;
    msend_packet(s, &(m->mdata), &(m->addr));
  }else{
    if(readsize == -1){
      /* err */
      lprintf(0, "%s: read error! seqno=%d errno=%d %s\n", __func__, m->mdata.head.seqno, errno, m->fn);
    }else{
      /* eof */
      lprintf(4, "%s: block send count=%d %s\n", __func__, m->mdata.head.seqno, m->fn);
      m->mdata.head.seqno  = 0;
      m->mdata.head.nstate = MAKUO_SENDSTATE_MARK;
      m->initstate = 1;
      m->lickflag  = 1;
    }
  }
  if(m->waitcount){
    m->waitcount--;
  }else{
    m->sendwait = 1;
    ack_clear(m, MAKUO_RECVSTATE_UPDATE);
    ack_clear(m, MAKUO_RECVSTATE_OPEN);
  }
}

static void msend_req_send_data(int s, mfile *m)
{
  if(!m->comm){
    m->mdata.head.seqno  = 0;
    m->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
    return;
  }
  if(m->sendwait){
    m->mdata.head.seqno  = 0;
    m->mdata.head.szdata = 0;
    m->mdata.head.flags |= MAKUO_FLAG_WAIT;
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  if(m->mdata.head.flags & MAKUO_FLAG_WAIT){
    uint32_t wp = m->markcount * 100 / m->waitspan;
    m->mdata.head.flags &= ~MAKUO_FLAG_WAIT;
    /* slow */
    if(m->waitspan > 16){
      if(wp > 50){
        m->waitspan /= 2;
      }
    }
    /* fast */
    if(wp < 10){
      m->waitspan *= 2;
    }
    m->waitcount = m->waitspan;
    lprintf(0,"%s: waitspan=%u markcount=%d\n", __func__, m->waitspan, m->markcount);    
  }
  if(m->lickflag){
    msend_req_send_markdata(s, m); /* send retry */
  }else{
    msend_req_send_filedata(s, m); /* send data  */
  }
}

static void msend_req_send_mark_init(int s, mfile *m)
{
  m->sendwait  = 1;
  m->initstate = 0;
  ack_clear(m, MAKUO_RECVSTATE_UPDATE);
  ack_clear(m, MAKUO_RECVSTATE_OPEN);
  ack_clear(m, MAKUO_RECVSTATE_MARK);
  msend_packet(s, &(m->mdata), &(m->addr));
}

static void msend_req_send_mark(int s, mfile *m)
{
  if(m->initstate){
    msend_req_send_mark_init(s, m);
    return;
  }
  if(m->sendwait){
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
    msend_req_send_mark_init(s, m);
    return;
  }
  if(ack_check(m, MAKUO_RECVSTATE_OPEN) == 1){
    msend_req_send_mark_init(s, m);
    return;
  }
  lprintf(9, "%s: MARK mark=%d %s\n", __func__, m->markcount, m->fn);
  m->mdata.head.nstate = MAKUO_SENDSTATE_DATA;
}

static void msend_req_send_close_init(int s, mfile *m)
{
  m->sendwait  = 1;
  m->initstate = 0;
  ack_clear(m, MAKUO_RECVSTATE_UPDATE);
  ack_clear(m, MAKUO_RECVSTATE_OPEN);
  ack_clear(m, MAKUO_RECVSTATE_MARK);
  msend_packet(s, &(m->mdata), &(m->addr));
}

static void msend_req_send_close(int s, mfile *m)
{
  if(m->initstate){
    msend_req_send_close_init(s, m);
    return;
  }
  if(m->sendwait){
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  if(ack_check(m, MAKUO_RECVSTATE_UPDATE) == 1){
    msend_req_send_close_init(s, m);
    return;
  }
  if(ack_check(m, MAKUO_RECVSTATE_OPEN) == 1){
    msend_req_send_close_init(s, m);
    return;
  }
  if(ack_check(m, MAKUO_RECVSTATE_MARK) == 1){
    msend_req_send_close_init(s, m);
    return;
  }
  if(m->mdata.head.ostate == MAKUO_SENDSTATE_MARK || 
     m->mdata.head.ostate == MAKUO_SENDSTATE_DATA ||
     m->mdata.head.ostate == MAKUO_SENDSTATE_OPEN){
    lprintf(1,"%s: update complate %s \n", __func__, m->fn);
  }
  m->mdata.head.nstate = MAKUO_SENDSTATE_LAST;
}

static void msend_req_send_last(int s, mfile *m)
{
  lprintf(9, "%s: LAST %s\n", __func__, m->fn);
  msend_packet(s, &(m->mdata), &(m->addr));
  msend_mfdel(m);
}

/*----- send -----*/
static void msend_req_send(int s, mfile *m)
{
  if(!m->comm){
    msend_mfdel(m);
    return;
  }
  switch(m->mdata.head.nstate){
    case MAKUO_SENDSTATE_STAT:
      msend_req_send_stat(s, m);
      break;
    case MAKUO_SENDSTATE_OPEN:
      msend_req_send_open(s, m);
      break;
    case MAKUO_SENDSTATE_DATA:
      msend_req_send_data(s, m);
      break;
    case MAKUO_SENDSTATE_MARK:
      msend_req_send_mark(s, m);
      break;
    case MAKUO_SENDSTATE_CLOSE:
      msend_req_send_close(s, m);
      break;
    case MAKUO_SENDSTATE_LAST:
      msend_req_send_last(s, m);
      break;
    case MAKUO_SENDSTATE_BREAK:
      msend_req_send_break(s, m);
      break;
  }
}

static void msend_req_md5_open_init(int s, mfile *m)
{
  m->sendwait  = 1;
  m->initstate = 0;
  ack_clear(m, -1);
  m->mdata.head.nstate = MAKUO_SENDSTATE_OPEN;
  msend_packet(s, &(m->mdata), &(m->addr));
}

static void msend_req_md5_open(int s, mfile *m)
{
  if(m->initstate){
    lprintf(9,"%s: %s\n", __func__, m->fn);
    msend_req_md5_open_init(s, m);
    return;
  }
  if(m->sendwait){
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  m->initstate = 1;
  m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
}

static void msend_req_md5_close_init(int s, mfile *m)
{
  m->sendwait  = 1;
  m->initstate = 0;
  ack_clear(m, MAKUO_RECVSTATE_MD5OK);
  ack_clear(m, MAKUO_RECVSTATE_MD5NG);
  m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
  msend_packet(s, &(m->mdata), &(m->addr));
}

static void msend_req_md5_close(int s, mfile *m)
{
  if(m->initstate){
    msend_req_md5_close_init(s, m);
    return;
  }
  if(m->sendwait){
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  lprintf(9,"%s: %s\n", __func__, m->fn);
  msend_mfdel(m);
  m = NULL;
}

/*----- md5 -----*/
static void msend_req_md5(int s, mfile *m)
{
  switch(m->mdata.head.nstate){
    case MAKUO_SENDSTATE_OPEN:
      msend_req_md5_open(s, m);
      break;
    case MAKUO_SENDSTATE_CLOSE:
      msend_req_md5_close(s, m);
      break;
  }
}

static void msend_req_dsync_open_init(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  m->sendwait  = 1;
  m->initstate = 0;
  msend_packet(s, &(m->mdata), &(m->addr));
}

static void msend_req_dsync_open(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  if(m->initstate){
    msend_req_dsync_open_init(s, m);
    return;
  }
  if(m->sendwait){
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  m->initstate = 1;
  m->mdata.head.nstate = MAKUO_SENDSTATE_CLOSE;
}

static void msend_req_dsync_close_init(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  m->sendwait  = 1;
  m->initstate = 0;
  ack_clear(m, MAKUO_RECVSTATE_OPEN);
}

static void msend_req_dsync_close(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  if(m->initstate){
    msend_req_dsync_close_init(s, m);
    return;
  }
  if(m->sendwait){
    return;
  }
  lprintf(9,"%s: %s\n", __func__, m->fn);
  msend_mfdel(m);
}

static void msend_req_dsync_break_init(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  m->sendwait  = 1;
  m->initstate = 0;
  ack_clear(m, -1);
  msend_packet(s, &(m->mdata), &(m->addr));
}

static void msend_req_dsync_break(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  if(m->initstate){
    msend_req_dsync_break_init(s, m);
    return;
  }
  if(m->sendwait){
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  lprintf(9,"%s: m=%p\n", __func__, m);
  msend_mfdel(m);
}

/*----- dsync -----*/
static void msend_req_dsync(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  if(m->mdata.head.nstate != MAKUO_SENDSTATE_LAST){
    if(!m->comm){
      if(m->mdata.head.nstate != MAKUO_SENDSTATE_BREAK){
        m->initstate = 1;
        m->mdata.head.nstate = MAKUO_SENDSTATE_BREAK;
      }
    }
  }
  switch(m->mdata.head.nstate){
    case MAKUO_SENDSTATE_OPEN:
      msend_req_dsync_open(s, m);
      break;
    case MAKUO_SENDSTATE_CLOSE:
      msend_req_dsync_close(s, m);
      break;
    case MAKUO_SENDSTATE_BREAK:
      msend_req_dsync_break(s, m);
      break;
    case MAKUO_SENDSTATE_LAST:
      msend_shot(s, m);
      break;
  }
}

static void msend_req_del_mark(int s, mfile *m)
{
  lprintf(9, "%s:\n", __func__);
  mfile *d = m->link; /* dsync object */
  if(member_get(&(d->addr.sin_addr))){
    mkack(&(d->mdata), &(d->addr), MAKUO_RECVSTATE_CLOSE);
  }else{
    d->lastrecv.tv_sec = 1;
  }
}

static void msend_req_del_stat(int s, mfile *m)
{
  int r;
  mfile *d;
  uint8_t stat;
  uint16_t len;
  uint32_t mod;

  if(m->pid == 0){
    for(d=mftop[0];d;d=d->next){
      if((d->mdata.head.opcode == MAKUO_OP_DEL) &&
         (d->mdata.head.nstate != MAKUO_SENDSTATE_STAT) &&
         (d->mdata.head.seqno  == m->mdata.head.seqno)){
        return;
      }
    }
    m->mdata.head.nstate = MAKUO_SENDSTATE_MARK;
    m->initstate = 1;
    m->sendwait  = 0;
    ack_clear(m, -1);
    return;
  }

  if(m->pipe == -1){
    if(waitpid(m->pid, NULL, WNOHANG) == m->pid){
      m->pid  =  0;
    }
    return;
  }

  lprintf(9, "%s:\n", __func__);
  r = read(m->pipe, &stat, sizeof(stat));
  if(r <= 0){
    close(m->pipe);
    m->pipe = -1;
    return;
  }

  d = mkreq(&(m->mdata), &(m->addr), MAKUO_SENDSTATE_OPEN);
  read(m->pipe, &mod, sizeof(mod));
  read(m->pipe, &len, sizeof(len));
  read(m->pipe, d->fn, len);
  d->fn[len]    = 0;
  d->fs.st_mode = mod;

  d->mdata.p = d->mdata.data;
  *(uint32_t *)(d->mdata.p) = htonl(mod);
  d->mdata.p += sizeof(mod);
  *(uint16_t *)(d->mdata.p) = htons(len);
  d->mdata.p += sizeof(len);
  memcpy(d->mdata.p, d->fn, len);
  d->mdata.head.flags  = m->mdata.head.flags;
  d->mdata.head.szdata = sizeof(mod) + sizeof(len) + len;
  d->mdata.head.reqid  = getrid();
  d->initstate = 1;
  d->sendwait  = 0;
  d->sendto = 1;
  d->dryrun = m->dryrun;
  d->recurs = m->recurs;
  lprintf(9, "%s: fn=%s rid=%d flags=%d\n", __func__, d->fn, d->mdata.head.reqid, d->mdata.head.flags);
}

/*----- del -----*/
static void msend_req_del(int s, mfile *m)
{
  if(m->mdata.head.nstate == MAKUO_SENDSTATE_BREAK){
    if(m->link){
      m->link->lastrecv.tv_sec = 1;
    }
    msend_mfdel(m);
    return;
  }
  if(m->mdata.head.nstate == MAKUO_SENDSTATE_LAST){
    msend_mfdel(m);
    return;
  }
  if(m->mdata.head.nstate == MAKUO_SENDSTATE_STAT){
    msend_req_del_stat(s, m);
    return;
  }
  if(m->initstate){
    m->initstate = 0;
    m->sendwait  = 1;
    ack_clear(m, -1);
    if(m->mdata.head.nstate == MAKUO_SENDSTATE_MARK){
      msend_req_del_mark(s, m);
      return;
    }
    msend_packet(s, &(m->mdata), &(m->addr));
    return;
  }
  if(m->sendwait){
    if(m->mdata.head.nstate == MAKUO_SENDSTATE_MARK){
      msend_req_del_mark(s, m);
    }else{
      msend_packet(s, &(m->mdata), &(m->addr));
    }
    return;
  }
  m->initstate = 1;
}

/*----- exit -----*/
static void msend_req_exit(int s, mfile *m)
{
  msend_shot(s, m);
}

/*----- ping -----*/
static void msend_req_ping(int s, mfile *m)
{
  msend_shot(s, m);
}

/*----- send request -----*/
static void msend_req(int s, mfile *m)
{
  switch(m->mdata.head.opcode){
    case MAKUO_OP_PING:
      msend_req_ping(s, m);
      break;
    case MAKUO_OP_EXIT:
      msend_req_exit(s, m);
      break;
    case MAKUO_OP_SEND:
      msend_req_send(s, m);
      break;
    case MAKUO_OP_MD5:
      msend_req_md5(s, m);
      break;
    case MAKUO_OP_DSYNC:
      msend_req_dsync(s, m);
      break;
    case MAKUO_OP_DEL:
      msend_req_del(s, m);
      break;
    /* æ©è½è¿½å ã¯ãã */
  }
}