From: Fredrik Tolf Date: Sun, 8 Jun 2008 12:02:18 +0000 (+0200) Subject: First potentially working version of the transsocket. X-Git-Tag: 1.3~18 X-Git-Url: http://git.dolda2000.com/gitweb/?a=commitdiff_plain;h=1ff9e8ea002024dee061378ae5871619d6b979dd;p=doldaconnect.git First potentially working version of the transsocket. --- diff --git a/daemon/fnet-dc.c b/daemon/fnet-dc.c index 36e4297..8f83123 100644 --- a/daemon/fnet-dc.c +++ b/daemon/fnet-dc.c @@ -1623,14 +1623,14 @@ static void cmd_direction(struct socket *sk, struct dcpeer *peer, char *cmd, cha peer->close = 1; return; } - transfer = newupload(peer->fn, &dcnet, peer->wcsname, peer->trpipe = mktrpipe(peer)); + transfer = newupload(peer->fn, &dcnet, peer->wcsname, (peer->trpipe = mktrpipe(peer))->back); } else { if((transfer = finddownload(peer->wcsname)) == NULL) { peer->close = 1; return; } - transferattach(transfer, peer->trpipe = mktrpipe(peer)); + transferattach(transfer, (peer->trpipe = mktrpipe(peer))->back); transfersetstate(transfer, TRNS_HS); } transfersetnick(transfer, peer->wcsname); @@ -1675,10 +1675,10 @@ static void cmd_peerlock(struct socket *sk, struct dcpeer *peer, char *cmd, char return; } peer->direction = TRNSD_UP; - transfer = newupload(peer->fn, &dcnet, peer->wcsname, peer->trpipe = mktrpipe(peer)); + transfer = newupload(peer->fn, &dcnet, peer->wcsname, (peer->trpipe = mktrpipe(peer))->back); } else { peer->direction = TRNSD_DOWN; - transferattach(transfer, peer->trpipe = mktrpipe(peer)); + transferattach(transfer, (peer->trpipe = mktrpipe(peer))->back); transfersetstate(transfer, TRNS_HS); } transfersetnick(transfer, peer->wcsname); @@ -2675,22 +2675,6 @@ static struct command peercmds[] = }; #undef cc -static struct socket *mktrpipe(struct dcpeer *peer) -{ - struct socket *sk; - - sk = netsockpipe(); - sk->data = peer; - return(sk); -} - -static void dctransdetach(struct transfer *transfer, struct dcpeer *peer) -{ - CBUNREG(transfer, trans_filterout, peer); - peer->transfer = NULL; - peer->close = 1; -} - static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer) { int ret; @@ -2703,26 +2687,30 @@ static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer) { if(sockqueueleft(peer->sk) > 0) { - if((buf = transfergetdata(transfer, &bufsize)) != NULL) + if((buf = sockgetinbuf(peer->trpipe, &bufsize)) != NULL) { - if(peer->compress == CPRS_NONE) - { - sockqueue(peer->sk, buf, bufsize); - } else if(peer->compress == CPRS_ZLIB) { - cstr = peer->cprsdata; - cstr->next_in = buf; - cstr->avail_in = bufsize; - while(cstr->avail_in > 0) + if((transfer->endpos >= 0) && (transfer->curpos + bufsize >= transfer->endpos)) + bufsize = transfer->endpos - transfer->curpos; + if(bufsize > 0) { + if(peer->compress == CPRS_NONE) { - cstr->next_out = outbuf; - cstr->avail_out = sizeof(outbuf); - if((ret = deflate(cstr, 0)) != Z_OK) + sockqueue(peer->sk, buf, bufsize); + } else if(peer->compress == CPRS_ZLIB) { + cstr = peer->cprsdata; + cstr->next_in = buf; + cstr->avail_in = bufsize; + while(cstr->avail_in > 0) { - flog(LOG_WARNING, "bug? deflate() did not return Z_OK (but rather %i)", ret); - freedcpeer(peer); - return; + cstr->next_out = outbuf; + cstr->avail_out = sizeof(outbuf); + if((ret = deflate(cstr, 0)) != Z_OK) + { + flog(LOG_WARNING, "bug? deflate() did not return Z_OK (but rather %i)", ret); + freedcpeer(peer); + return; + } + sockqueue(peer->sk, outbuf, sizeof(outbuf) - cstr->avail_out); } - sockqueue(peer->sk, outbuf, sizeof(outbuf) - cstr->avail_out); } } free(buf); @@ -2766,19 +2754,40 @@ static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer) } } -static void dctransendofdata(struct transfer *transfer, struct dcpeer *peer) +void trpiperead(struct socket *sk, struct dcpeer *peer) +{ + dctransgotdata(peer->transfer, peer); +} + +void trpipewrite(struct socket *sk, struct dcpeer *peer) +{ +} + +void trpipeerr(struct socket *sk, int errno, struct dcpeer *peer) { peer->state = PEER_SYNC; - dctransgotdata(transfer, peer); + dctransgotdata(peer->transfer, peer); + CBUNREG(peer->transfer, trans_filterout, peer); +} + +static struct socket *mktrpipe(struct dcpeer *peer) +{ + struct socket *sk; + + sk = netsockpipe(); + sk->data = peer; + sk->readcb = (void (*)(struct socket *, void *))trpiperead; + sk->writecb = (void (*)(struct socket *, void *))trpipewrite; + sk->errcb = (void (*)(struct socket *, int, void *))trpipeerr; + return(sk); } static void transread(struct socket *sk, struct dcpeer *peer) { void *buf; size_t bufsize; - struct transfer *transfer; - if(transferdatasize(peer->transfer) < 0) + if(sockqueueleft(peer->trpipe) < 0) return; if((buf = sockgetinbuf(sk, &bufsize)) == NULL) return; @@ -2788,23 +2797,17 @@ static void transread(struct socket *sk, struct dcpeer *peer) freedcpeer(peer); return; } - transferputdata(peer->transfer, buf, bufsize); + sockqueue(peer->trpipe, buf, bufsize); free(buf); if(peer->transfer->curpos >= peer->transfer->size) { - transfer = peer->transfer; - transferdetach(transfer); - transferendofdata(transfer); + closesock(peer->trpipe); + quitsock(peer->trpipe); + peer->close = 1; return; } } -static void dcwantdata(struct transfer *transfer, struct dcpeer *peer) -{ - if(transferdatasize(transfer) > 0) - transread(peer->sk, peer); -} - static void transerr(struct socket *sk, int err, struct dcpeer *peer) { struct transfer *transfer; @@ -2814,8 +2817,9 @@ static void transerr(struct socket *sk, int err, struct dcpeer *peer) freedcpeer(peer); return; } - transferdetach(transfer); - transferendofdata(transfer); + closesock(peer->trpipe); + quitsock(peer->trpipe); + peer->close = 1; } static void transwrite(struct socket *sk, struct dcpeer *peer) @@ -3133,10 +3137,11 @@ static void freedcpeer(struct dcpeer *peer) peer->prev->next = peer->next; if(peer->trpipe != NULL) { closesock(peer->trpipe); - putsock(peer->trpipe); + quitsock(peer->trpipe); } if(peer->transfer != NULL) { + CBUNREG(peer->transfer, trans_filterout, peer); if(peer->transfer->dir == TRNSD_UP) peer->transfer->close = 1; if(peer->transfer->dir == TRNSD_DOWN) diff --git a/daemon/net.c b/daemon/net.c index 6f83266..ed7ad9b 100644 --- a/daemon/net.c +++ b/daemon/net.c @@ -390,6 +390,14 @@ void putsock(struct socket *sk) } } +void quitsock(struct socket *sk) +{ + sk->readcb = NULL; + sk->writecb = NULL; + sk->errcb = NULL; + putsock(sk); +} + static void linksock(struct scons **list, struct socket *sk) { struct scons *sc; @@ -771,6 +779,18 @@ size_t sockgetdatalen(struct socket *sk) /* return(sockgetdatalen(sk->back)); */ /* } */ +size_t socktqueuesize(struct socket *sk) +{ + size_t ret; + + ret = 0; + while(1) { + ret += sockgetdatalen(sk->back); + if((sk = sk->back->pnext) == NULL) + return(ret); + } +} + ssize_t sockqueueleft(struct socket *sk) { return(sk->back->maxbuf - sockgetdatalen(sk->back)); diff --git a/daemon/net.h b/daemon/net.h index 9ae9650..b539860 100644 --- a/daemon/net.h +++ b/daemon/net.h @@ -87,6 +87,7 @@ void freedgbuf(struct dgrambuf *dg); void sockqueue(struct socket *sk, void *data, size_t size); void sockerror(struct socket *sk, int en); /* size_t sockqueuesize(struct socket *sk); */ +size_t socktqueuesize(struct socket *sk); ssize_t sockqueueleft(struct socket *sk); int netresolve(char *addr, void (*callback)(struct sockaddr *addr, int addrlen, void *data), void *data); struct socket *netcsdgram(struct sockaddr *name, socklen_t namelen); @@ -109,5 +110,6 @@ void sockpushdata(struct socket *sk, void *buf, size_t size); int sockpeeraddr(struct socket *sk, struct sockaddr **namebuf, socklen_t *lenbuf); int getucred(struct socket *sk, uid_t *uid, gid_t *gid); int sockfamily(struct socket *sk); +void quitsock(struct socket *sk); #endif diff --git a/daemon/transfer.c b/daemon/transfer.c index 9c50ed1..f23a3ed 100644 --- a/daemon/transfer.c +++ b/daemon/transfer.c @@ -134,6 +134,10 @@ static void localread(struct socket *sk, struct transfer *transfer) if((transfer->datapipe != NULL) && (sockqueueleft(transfer->datapipe) > 0)) { buf = sockgetinbuf(sk, &blen); sockqueue(transfer->datapipe, buf, blen); + time(&transfer->activity); + transfer->curpos += blen; + bytesupload += blen; + CBCHAINDOCB(transfer, trans_p, transfer); } } @@ -145,6 +149,9 @@ static void dataread(struct socket *sk, struct transfer *transfer) if((transfer->localend != NULL) && (sockqueueleft(transfer->localend) > 0)) { buf = sockgetinbuf(sk, &blen); sockqueue(transfer->localend, buf, blen); + transfer->curpos += blen; + bytesdownload += blen; + CBCHAINDOCB(transfer, trans_p, transfer); } } @@ -168,8 +175,14 @@ static void localerr(struct socket *sk, int errno, struct transfer *transfer) static void dataerr(struct socket *sk, int errno, struct transfer *transfer) { - if(transfer->localend != NULL) + if(transfer->curpos >= transfer->size) { + transfersetstate(transfer, TRNS_DONE); closesock(transfer->localend); + quitsock(transfer->localend); + transfer->localend = NULL; + } else { + resettransfer(transfer); + } } void transferattach(struct transfer *transfer, struct socket *dpipe) @@ -185,11 +198,8 @@ void transferattach(struct transfer *transfer, struct socket *dpipe) void transferdetach(struct transfer *transfer) { if(transfer->datapipe != NULL) { - transfer->datapipe->readcb = NULL; - transfer->datapipe->writecb = NULL; - transfer->datapipe->errcb = NULL; closesock(transfer->datapipe); - putsock(transfer->datapipe); + quitsock(transfer->datapipe); } transfer->datapipe = NULL; } @@ -281,59 +291,6 @@ static void transexpire(int cancelled, struct transfer *transfer) transfer->timeout = 0; } -static void transferputdata(struct transfer *transfer, void *buf, size_t size) -{ - time(&transfer->activity); - sockqueue(transfer->localend, buf, size); - transfer->curpos += size; - bytesdownload += size; - CBCHAINDOCB(transfer, trans_p, transfer); -} - -static void transferendofdata(struct transfer *transfer) -{ - if(transfer->curpos >= transfer->size) - { - transfersetstate(transfer, TRNS_DONE); - transfer->localend->readcb = NULL; - transfer->localend->writecb = NULL; - transfer->localend->errcb = NULL; - putsock(transfer->localend); - transfer->localend = NULL; - } else { - resettransfer(transfer); - } -} - -static ssize_t transferdatasize(struct transfer *transfer) -{ - return(sockqueueleft(transfer->localend)); -} - -static void *transfergetdata(struct transfer *transfer, size_t *size) -{ - void *buf; - - if(transfer->localend == NULL) - return(NULL); - time(&transfer->activity); - if((buf = sockgetinbuf(transfer->localend, size)) == NULL) - return(NULL); - if((transfer->endpos >= 0) && (transfer->curpos + *size >= transfer->endpos)) - { - if((*size = transfer->endpos - transfer->curpos) == 0) { - free(buf); - buf = NULL; - } else { - buf = srealloc(buf, *size); - } - } - transfer->curpos += *size; - bytesupload += *size; - CBCHAINDOCB(transfer, trans_p, transfer); - return(buf); -} - void transferprepul(struct transfer *transfer, off_t size, off_t start, off_t end, struct socket *lesk) { transfersetsize(transfer, size);