... | ... |
@@ -599,6 +599,8 @@ static const char* parse_dispatch_cmd(client_conn_t *conn, struct fd_buf *buf, s |
599 | 599 |
logg("$Moved partial command: %lu\n", (unsigned long)buf->off); |
600 | 600 |
else |
601 | 601 |
logg("$Consumed entire command\n"); |
602 |
+ /* adjust pos to account for the buffer shuffle */ |
|
603 |
+ pos = 0; |
|
602 | 604 |
} |
603 | 605 |
*ppos = pos; |
604 | 606 |
return cmd; |
... | ... |
@@ -610,75 +612,82 @@ static int handle_stream(client_conn_t *conn, struct fd_buf *buf, const struct o |
610 | 610 |
int rc; |
611 | 611 |
size_t pos = *ppos; |
612 | 612 |
size_t cmdlen; |
613 |
- |
|
613 |
+ |
|
614 | 614 |
logg("$mode == MODE_STREAM\n"); |
615 |
- /* we received a chunk, set readtimeout */ |
|
615 |
+ /* we received some data, set readtimeout */ |
|
616 | 616 |
time(&buf->timeout_at); |
617 | 617 |
buf->timeout_at += readtimeout; |
618 |
- if (!buf->chunksize) { |
|
619 |
- /* read chunksize */ |
|
620 |
- if (buf->off >= 4) { |
|
621 |
- uint32_t cs = *(uint32_t*)buf->buffer; |
|
622 |
- buf->chunksize = ntohl(cs); |
|
623 |
- logg("$Got chunksize: %u\n", buf->chunksize); |
|
624 |
- if (!buf->chunksize) { |
|
625 |
- /* chunksize 0 marks end of stream */ |
|
626 |
- conn->scanfd = buf->dumpfd; |
|
627 |
- conn->term = buf->term; |
|
628 |
- buf->dumpfd = -1; |
|
629 |
- buf->mode = buf->group ? MODE_COMMAND : MODE_WAITREPLY; |
|
630 |
- if (buf->mode == MODE_WAITREPLY) |
|
631 |
- buf->fd = -1; |
|
632 |
- logg("$Chunks complete\n"); |
|
633 |
- buf->dumpname = NULL; |
|
634 |
- if ((rc = execute_or_dispatch_command(conn, COMMAND_INSTREAMSCAN, NULL)) < 0) { |
|
635 |
- logg("!Command dispatch failed\n"); |
|
636 |
- if(rc == -1 && optget(opts, "ExitOnOOM")->enabled) { |
|
637 |
- pthread_mutex_lock(&exit_mutex); |
|
638 |
- progexit = 1; |
|
639 |
- pthread_mutex_unlock(&exit_mutex); |
|
640 |
- } |
|
641 |
- *error = 1; |
|
642 |
- return -1; |
|
643 |
- } else { |
|
644 |
- pos = 4; |
|
645 |
- memmove (buf->buffer, &buf->buffer[pos], buf->off - pos); |
|
646 |
- buf->off -= pos; |
|
647 |
- *ppos = 0; |
|
648 |
- buf->id++; |
|
649 |
- return 0; |
|
618 |
+ while (pos <= buf->off) { |
|
619 |
+ if (!buf->chunksize) { |
|
620 |
+ /* read chunksize */ |
|
621 |
+ if (buf->off-pos >= 4) { |
|
622 |
+ uint32_t cs; |
|
623 |
+ memmove(&cs, buf->buffer + pos, 4); |
|
624 |
+ pos += 4; |
|
625 |
+ buf->chunksize = ntohl(cs); |
|
626 |
+ logg("$Got chunksize: %u\n", buf->chunksize); |
|
627 |
+ if (!buf->chunksize) { |
|
628 |
+ /* chunksize 0 marks end of stream */ |
|
629 |
+ conn->scanfd = buf->dumpfd; |
|
630 |
+ conn->term = buf->term; |
|
631 |
+ buf->dumpfd = -1; |
|
632 |
+ buf->mode = buf->group ? MODE_COMMAND : MODE_WAITREPLY; |
|
633 |
+ if (buf->mode == MODE_WAITREPLY) |
|
634 |
+ buf->fd = -1; |
|
635 |
+ logg("$Chunks complete\n"); |
|
636 |
+ buf->dumpname = NULL; |
|
637 |
+ if ((rc = execute_or_dispatch_command(conn, COMMAND_INSTREAMSCAN, NULL)) < 0) { |
|
638 |
+ logg("!Command dispatch failed\n"); |
|
639 |
+ if(rc == -1 && optget(opts, "ExitOnOOM")->enabled) { |
|
640 |
+ pthread_mutex_lock(&exit_mutex); |
|
641 |
+ progexit = 1; |
|
642 |
+ pthread_mutex_unlock(&exit_mutex); |
|
643 |
+ } |
|
644 |
+ *error = 1; |
|
645 |
+ } else { |
|
646 |
+ memmove (buf->buffer, &buf->buffer[pos], buf->off - pos); |
|
647 |
+ buf->off -= pos; |
|
648 |
+ *ppos = 0; |
|
649 |
+ buf->id++; |
|
650 |
+ return 0; |
|
651 |
+ } |
|
650 | 652 |
} |
651 |
- } |
|
652 |
- if (buf->chunksize > buf->quota) { |
|
653 |
- logg("^INSTREAM: Size limit reached, (requested: %lu, max: %lu)\n", |
|
654 |
- (unsigned long)buf->chunksize, (unsigned long)buf->quota); |
|
655 |
- conn_reply_error(conn, "INSTREAM size limit exceeded."); |
|
656 |
- *error = 1; |
|
657 |
- *ppos = pos; |
|
658 |
- return -1; |
|
653 |
+ if (buf->chunksize > buf->quota) { |
|
654 |
+ logg("^INSTREAM: Size limit reached, (requested: %lu, max: %lu)\n", |
|
655 |
+ (unsigned long)buf->chunksize, (unsigned long)buf->quota); |
|
656 |
+ conn_reply_error(conn, "INSTREAM size limit exceeded."); |
|
657 |
+ *error = 1; |
|
658 |
+ *ppos = pos; |
|
659 |
+ return -1; |
|
660 |
+ } else { |
|
661 |
+ buf->quota -= buf->chunksize; |
|
662 |
+ } |
|
663 |
+ logg("$Quota Remaining: %lu\n", buf->quota); |
|
659 | 664 |
} else { |
660 |
- buf->quota -= buf->chunksize; |
|
661 |
- } |
|
662 |
- logg("$Quota: %lu\n", buf->quota); |
|
663 |
- pos = 4; |
|
664 |
- } else |
|
665 |
- return -1; |
|
666 |
- } else |
|
667 |
- pos = 0; |
|
668 |
- if (pos + buf->chunksize < buf->off) |
|
669 |
- cmdlen = buf->chunksize; |
|
670 |
- else |
|
671 |
- cmdlen = buf->off - pos; |
|
672 |
- buf->chunksize -= cmdlen; |
|
673 |
- if (cli_writen(buf->dumpfd, buf->buffer + pos, cmdlen) < 0) { |
|
674 |
- conn_reply_error(conn, "Error writing to temporary file"); |
|
675 |
- logg("!INSTREAM: Can't write to temporary file.\n"); |
|
676 |
- *error = 1; |
|
677 |
- } |
|
678 |
- logg("$Processed %lu bytes of chunkdata\n", cmdlen); |
|
679 |
- pos += cmdlen; |
|
680 |
- if (pos == buf->off) { |
|
681 |
- buf->off = 0; |
|
665 |
+ /* need more data, so return and wait for some */ |
|
666 |
+ *ppos = pos; |
|
667 |
+ return -1; |
|
668 |
+ } |
|
669 |
+ } |
|
670 |
+ if (pos + buf->chunksize < buf->off) |
|
671 |
+ cmdlen = buf->chunksize; |
|
672 |
+ else |
|
673 |
+ cmdlen = buf->off - pos; |
|
674 |
+ buf->chunksize -= cmdlen; |
|
675 |
+ if (cli_writen(buf->dumpfd, buf->buffer + pos, cmdlen) < 0) { |
|
676 |
+ conn_reply_error(conn, "Error writing to temporary file"); |
|
677 |
+ logg("!INSTREAM: Can't write to temporary file.\n"); |
|
678 |
+ *error = 1; |
|
679 |
+ } |
|
680 |
+ logg("$Processed %lu bytes of chunkdata\n", cmdlen); |
|
681 |
+ pos += cmdlen; |
|
682 |
+ if (pos == buf->off) { |
|
683 |
+ buf->off = 0; |
|
684 |
+ pos = 0; |
|
685 |
+ /* need more data, so return and wait for some */ |
|
686 |
+ *ppos = pos; |
|
687 |
+ return -1; |
|
688 |
+ } |
|
682 | 689 |
} |
683 | 690 |
*ppos = pos; |
684 | 691 |
return 0; |