Major rework to use cbchains on sockets.
[doldaconnect.git] / daemon / transfer.c
index b2be0fe..ab2579d 100644 (file)
 #include "client.h"
 
 static void killfilter(struct transfer *transfer);
+static int transferread(struct socket *sk, struct transfer *transfer);
+static int transferwrite(struct socket *sk, struct transfer *transfer);
+static int transfererr(struct socket *sk, int errno, struct transfer *transfer);
+static int filterread(struct socket *sk, struct transfer *transfer);
 
+unsigned long long bytesupload = 0;
+unsigned long long bytesdownload = 0;
 struct transfer *transfers = NULL;
 int numtransfers = 0;
 GCBCHAIN(newtransfercb, struct transfer *);
@@ -78,18 +84,18 @@ void freetransfer(struct transfer *transfer)
        free(transfer->filterbuf);
     if(transfer->hash != NULL)
        freehash(transfer->hash);
+    if(transfer->exitstatus != NULL)
+       free(transfer->exitstatus);
     if(transfer->localend != NULL)
     {
-       transfer->localend->readcb = NULL;
-       transfer->localend->writecb = NULL;
-       transfer->localend->errcb = NULL;
+       CBUNREG(transfer->localend, socket_read, transferread, transfer);
+       CBUNREG(transfer->localend, socket_write, transferwrite, transfer);
+       CBUNREG(transfer->localend, socket_err, transfererr, transfer);
        putsock(transfer->localend);
     }
     if(transfer->filterout != NULL)
     {
-       transfer->filterout->readcb = NULL;
-       transfer->filterout->writecb = NULL;
-       transfer->filterout->errcb = NULL;
+       CBUNREG(transfer->filterout, socket_read, filterread, transfer);
        putsock(transfer->filterout);
     }
     if(transfer->fn != NULL)
@@ -151,6 +157,18 @@ struct transfer *finddownload(wchar_t *peerid)
     return(transfer);
 }
 
+struct transfer *hasupload(struct fnet *fnet, wchar_t *peerid)
+{
+    struct transfer *transfer;
+    
+    for(transfer = transfers; transfer != NULL; transfer = transfer->next)
+    {
+       if((transfer->dir == TRNSD_UP) && (transfer->fnet == fnet) && !wcscmp(transfer->peerid, peerid))
+           break;
+    }
+    return(transfer);
+}
+
 struct transfer *newupload(struct fnetnode *fn, struct fnet *fnet, wchar_t *nickid, struct transferiface *iface, void *data)
 {
     struct transfer *transfer;
@@ -215,24 +233,27 @@ static void transexpire(int cancelled, struct transfer *transfer)
        transfer->timeout = 0;
 }
 
-static void transferread(struct socket *sk, struct transfer *transfer)
+static int transferread(struct socket *sk, struct transfer *transfer)
 {
     if(sockgetdatalen(sk) >= 65536)
        sk->ignread = 1;
     if((transfer->iface != NULL) && (transfer->iface->gotdata != NULL))
        transfer->iface->gotdata(transfer, transfer->ifacedata);
+    return(0);
 }
 
-static void transferwrite(struct socket *sk, struct transfer *transfer)
+static int transferwrite(struct socket *sk, struct transfer *transfer)
 {
     if((transfer->iface != NULL) && (transfer->iface->wantdata != NULL))
        transfer->iface->wantdata(transfer, transfer->ifacedata);
+    return(0);
 }
 
-static void transfererr(struct socket *sk, int errno, struct transfer *transfer)
+static int transfererr(struct socket *sk, int errno, struct transfer *transfer)
 {
     if((transfer->iface != NULL) && (transfer->iface->endofdata != NULL))
        transfer->iface->endofdata(transfer, transfer->ifacedata);
+    return(0);
 }
 
 void transferputdata(struct transfer *transfer, void *buf, size_t size)
@@ -240,6 +261,7 @@ void transferputdata(struct transfer *transfer, void *buf, size_t size)
     time(&transfer->activity);
     sockqueue(transfer->localend, buf, size);
     transfer->curpos += size;
+    bytesdownload += size;
     CBCHAINDOCB(transfer, trans_p, transfer);
 }
 
@@ -248,9 +270,9 @@ void transferendofdata(struct transfer *transfer)
     if(transfer->curpos >= transfer->size)
     {
        transfersetstate(transfer, TRNS_DONE);
-       transfer->localend->readcb = NULL;
-       transfer->localend->writecb = NULL;
-       transfer->localend->errcb = NULL;
+       CBUNREG(transfer->localend, socket_read, transferread, transfer);
+       CBUNREG(transfer->localend, socket_write, transferwrite, transfer);
+       CBUNREG(transfer->localend, socket_err, transfererr, transfer);
        putsock(transfer->localend);
        transfer->localend = NULL;
     } else {
@@ -279,6 +301,7 @@ void *transfergetdata(struct transfer *transfer, size_t *size)
        buf = srealloc(buf, *size);
     }
     transfer->curpos += *size;
+    bytesupload += *size;
     CBCHAINDOCB(transfer, trans_p, transfer);
     return(buf);
 }
@@ -311,20 +334,64 @@ void transfersetlocalend(struct transfer *transfer, struct socket *sk)
     if(transfer->localend != NULL)
        putsock(transfer->localend);
     getsock(transfer->localend = sk);
-    sk->data = transfer;
-    sk->readcb = (void (*)(struct socket *, void *))transferread;
-    sk->writecb = (void (*)(struct socket *, void *))transferwrite;
-    sk->errcb = (void (*)(struct socket *, int, void *))transfererr;
+    CBREG(sk, socket_read, (int (*)(struct socket *, void *))transferread, NULL, transfer);
+    CBREG(sk, socket_write, (int (*)(struct socket *, void *))transferwrite, NULL, transfer);
+    CBREG(sk, socket_err, (int (*)(struct socket *, int, void *))transfererr, NULL, transfer);
 }
 
-void bumptransfer(struct transfer *transfer)
+static int tryreq(struct transfer *transfer)
 {
     struct fnetnode *fn;
     struct fnetpeer *peer;
+    
+    if((fn = transfer->fn) != NULL)
+    {
+       if(fn->state != FNN_EST)
+       {
+           transfer->close = 1;
+           return(1);
+       }
+       peer = fnetfindpeer(fn, transfer->peerid);
+    } else {
+       peer = NULL;
+       for(fn = fnetnodes; fn != NULL; fn = fn->next)
+       {
+           if((fn->state == FNN_EST) && (fn->fnet == transfer->fnet) && ((peer = fnetfindpeer(fn, transfer->peerid)) != NULL))
+               break;
+       }
+    }
+    if(peer != NULL)
+    {
+       time(&transfer->lastreq);
+       return(fn->fnet->reqconn(peer));
+    }
+    return(1);
+}
+
+void trytransferbypeer(struct fnet *fnet, wchar_t *peerid)
+{
+    struct transfer *transfer;
+    
+    for(transfer = transfers; transfer != NULL; transfer = transfer->next)
+    {
+       if((transfer->dir == TRNSD_DOWN) && (transfer->state == TRNS_WAITING))
+       {
+           if((transfer->fnet == fnet) && !wcscmp(transfer->peerid, peerid))
+           {
+               if(!tryreq(transfer))
+                   return;
+           }
+       }
+    }
+}
+
+void bumptransfer(struct transfer *transfer)
+{
     time_t now;
     
     if((now = time(NULL)) < transfer->timeout)
     {
+
        if(transfer->etimer == NULL)
            transfer->etimer = timercallback(transfer->timeout, (void (*)(int, void *))transexpire, transfer);
        return;
@@ -334,32 +401,9 @@ void bumptransfer(struct transfer *transfer)
     switch(transfer->state)
     {
     case TRNS_WAITING:
-       if(transfer->fn != NULL)
-       {
-           fn = transfer->fn;
-           if(fn->state != FNN_EST)
-           {
-               transfer->close = 1;
-               return;
-           }
-           peer = fnetfindpeer(fn, transfer->peerid);
-       } else {
-           peer = NULL;
-           for(fn = fnetnodes; fn != NULL; fn = fn->next)
-           {
-               if((fn->state == FNN_EST) && (fn->fnet == transfer->fnet) && ((peer = fnetfindpeer(fn, transfer->peerid)) != NULL))
-                   break;
-           }
-       }
        transfer->etimer = timercallback(transfer->timeout = (time(NULL) + 30), (void (*)(int, void *))transexpire, transfer);
        if(now - transfer->lastreq > 30)
-       {
-           if(peer != NULL)
-           {
-               fn->fnet->reqconn(peer);
-               time(&transfer->lastreq);
-           }
-       }
+           tryreq(transfer);
        break;
     case TRNS_HS:
        if(transfer->dir == TRNSD_UP)
@@ -468,15 +512,15 @@ static void killfilter(struct transfer *transfer)
     }
     if(transfer->localend)
     {
-       transfer->localend->readcb = NULL;
-       transfer->localend->writecb = NULL;
-       transfer->localend->errcb = NULL;
+       CBUNREG(transfer->localend, socket_read, transferread, transfer);
+       CBUNREG(transfer->localend, socket_write, transferwrite, transfer);
+       CBUNREG(transfer->localend, socket_err, transfererr, transfer);
        putsock(transfer->localend);
        transfer->localend = NULL;
     }
     if(transfer->filterout)
     {
-       transfer->filterout->readcb = NULL;
+       CBUNREG(transfer->filterout, socket_read, filterread, transfer);
        putsock(transfer->filterout);
        transfer->filterout = NULL;
     }
@@ -540,17 +584,28 @@ static char *findfilter(struct passwd *pwd)
     return(NULL);
 }
 
-static void filterread(struct socket *sk, struct transfer *transfer)
+static void handletranscmd(struct transfer *transfer, wchar_t *cmd, wchar_t *arg)
+{
+    if(!wcscmp(cmd, L"status")) {
+       if(arg == NULL)
+           arg = L"";
+       if(transfer->exitstatus != NULL)
+           free(transfer->exitstatus);
+       transfer->exitstatus = swcsdup(arg);
+    }
+}
+
+static int filterread(struct socket *sk, struct transfer *transfer)
 {
     char *buf, *p, *p2;
     size_t bufsize;
     wchar_t *cmd, *arg;
     
     if((buf = sockgetinbuf(sk, &bufsize)) == NULL)
-       return;
+       return(0);
     bufcat(transfer->filterbuf, buf, bufsize);
     free(buf);
-    if((p = memchr(transfer->filterbuf, '\n', transfer->filterbufdata)) != NULL)
+    while((p = memchr(transfer->filterbuf, '\n', transfer->filterbufdata)) != NULL)
     {
        *(p++) = 0;
        if((p2 = strchr(transfer->filterbuf, ' ')) != NULL)
@@ -561,8 +616,9 @@ static void filterread(struct socket *sk, struct transfer *transfer)
            if(p2 != NULL)
            {
                if((arg = icmbstowcs(p2, NULL)) == NULL)
-                   flog(LOG_WARNING, "filter sent a string which could not be converted into the local charset: %s: %s", transfer->filterbuf, strerror(errno));
+                   flog(LOG_WARNING, "filter sent a string which could not be converted into the local charset: %s: %s", p2, strerror(errno));
            }
+           handletranscmd(transfer, cmd, arg);
            CBCHAINDOCB(transfer, trans_filterout, transfer, cmd, arg);
            if(arg != NULL)
                free(arg);
@@ -572,11 +628,14 @@ static void filterread(struct socket *sk, struct transfer *transfer)
        }
        memmove(transfer->filterbuf, p, transfer->filterbufdata -= (p - transfer->filterbuf));
     }
+    return(0);
 }
 
 static void filterexit(pid_t pid, int status, void *data)
 {
     struct transfer *transfer;
+    struct fnet *fnet;
+    wchar_t *peerid;
     
     for(transfer = transfers; transfer != NULL; transfer = transfer->next)
     {
@@ -584,12 +643,14 @@ static void filterexit(pid_t pid, int status, void *data)
        {
            transfer->filter = -1;
            killfilter(transfer);
+           fnet = transfer->fnet;
+           peerid = swcsdup(transfer->peerid);
            if(WEXITSTATUS(status))
-           {
                resettransfer(transfer);
-           } else {
+           else
                freetransfer(transfer);
-           }
+           trytransferbypeer(fnet, peerid);
+           free(peerid);
            break;
        }
     }
@@ -704,8 +765,7 @@ int forkfilter(struct transfer *transfer)
     transfer->filter = pid;
     transfersetlocalend(transfer, insock);
     getsock(transfer->filterout = outsock);
-    outsock->data = transfer;
-    outsock->readcb = (void (*)(struct socket *, void *))filterread;
+    CBREG(outsock, socket_read, (int (*)(struct socket *, void *))filterread, NULL, transfer);
     putsock(insock);
     putsock(outsock);
     free(filtername);
@@ -746,6 +806,7 @@ static struct configvar myvars[] =
     {CONF_VAR_INT, "ultos", {.num = SOCK_TOS_MAXTP}},
     {CONF_VAR_INT, "dltos", {.num = SOCK_TOS_MAXTP}},
     {CONF_VAR_STRING, "filter", {.str = L"dc-filter"}},
+    {CONF_VAR_BOOL, "ulquota", {.num = 0}},
     {CONF_VAR_END}
 };