python: Added a channel superclass for ashd.async.
[ashd.git] / lib / mtio.c
... / ...
CommitLineData
1/*
2 ashd - A Sane HTTP Daemon
3 Copyright (C) 2008 Fredrik Tolf <fredrik@dolda2000.com>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22#include <stdlib.h>
23#include <stdio.h>
24#include <unistd.h>
25#include <fcntl.h>
26#include <string.h>
27#include <errno.h>
28#include <sys/socket.h>
29
30#include <log.h>
31#include <utils.h>
32#include <mt.h>
33#include <mtio.h>
34#include <bufio.h>
35
36static ssize_t mtrecv(struct stdiofd *d, void *buf, size_t len)
37{
38 struct msghdr msg;
39 char cbuf[512];
40 struct cmsghdr *cmsg;
41 struct iovec bufvec;
42 socklen_t clen;
43 ssize_t ret;
44 int i, *fds;
45
46 msg = (struct msghdr){};
47 msg.msg_iov = &bufvec;
48 msg.msg_iovlen = 1;
49 bufvec.iov_base = buf;
50 bufvec.iov_len = len;
51 msg.msg_control = cbuf;
52 msg.msg_controllen = sizeof(cbuf);
53 if((ret = recvmsg(d->fd, &msg, MSG_DONTWAIT)) < 0)
54 return(ret);
55 for(cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
56 if((cmsg->cmsg_level == SOL_SOCKET) && (cmsg->cmsg_type == SCM_RIGHTS)) {
57 fds = (int *)CMSG_DATA(cmsg);
58 clen = (cmsg->cmsg_len - ((char *)fds - (char *)cmsg)) / sizeof(*fds);
59 for(i = 0; i < clen; i++) {
60 if(d->rights < 0)
61 d->rights = fds[i];
62 else
63 close(fds[i]);
64 }
65 }
66 }
67 return(ret);
68}
69
70static ssize_t mtread(void *cookie, void *buf, size_t len)
71{
72 struct stdiofd *d = cookie;
73 int ev;
74 ssize_t ret;
75
76 while(1) {
77 if(d->sock)
78 ret = mtrecv(d, buf, len);
79 else
80 ret = read(d->fd, buf, len);
81 if((ret < 0) && (errno == EAGAIN)) {
82 ev = block(d->fd, EV_READ, d->timeout);
83 if(ev < 0) {
84 /* If we just go on, we should get the real error. */
85 continue;
86 } else if(ev == 0) {
87 errno = ETIMEDOUT;
88 return(-1);
89 } else {
90 continue;
91 }
92 } else {
93 return(ret);
94 }
95 }
96}
97
98static ssize_t mtsend(struct stdiofd *d, const void *buf, size_t len)
99{
100 struct msghdr msg;
101 struct cmsghdr *cmsg;
102 char cbuf[CMSG_SPACE(sizeof(int))];
103 struct iovec bufvec;
104 ssize_t ret;
105 int cr;
106
107 msg = (struct msghdr){};
108 msg.msg_iov = &bufvec;
109 msg.msg_iovlen = 1;
110 bufvec.iov_base = (void *)buf;
111 bufvec.iov_len = len;
112 cr = -1;
113 if(d->sendrights >= 0) {
114 msg.msg_control = cbuf;
115 msg.msg_controllen = sizeof(cbuf);
116 cmsg = CMSG_FIRSTHDR(&msg);
117 cmsg->cmsg_level = SOL_SOCKET;
118 cmsg->cmsg_type = SCM_RIGHTS;
119 cmsg->cmsg_len = CMSG_LEN(sizeof(int));
120 *((int *)CMSG_DATA(cmsg)) = d->sendrights;
121 cr = d->sendrights;
122 d->sendrights = -1;
123 msg.msg_controllen = cmsg->cmsg_len;
124 }
125 ret = sendmsg(d->fd, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
126 if(cr >= 0)
127 close(cr);
128 return(ret);
129}
130
131static ssize_t mtwrite(void *cookie, const void *buf, size_t len)
132{
133 struct stdiofd *d = cookie;
134 int ev;
135 ssize_t ret;
136
137 while(1) {
138 if(d->sock)
139 ret = mtsend(d, buf, len);
140 else
141 ret = write(d->fd, buf, len);
142 if((ret < 0) && (errno == EAGAIN)) {
143 ev = block(d->fd, EV_WRITE, d->timeout);
144 if(ev < 0) {
145 /* If we just go on, we should get the real error. */
146 continue;
147 } else if(ev == 0) {
148 errno = ETIMEDOUT;
149 return(-1);
150 }
151 } else {
152 return(ret);
153 }
154 }
155}
156
157static int mtclose(void *cookie)
158{
159 struct stdiofd *d = cookie;
160
161 close(d->fd);
162 if(d->rights >= 0)
163 close(d->rights);
164 if(d->sendrights >= 0)
165 close(d->sendrights);
166 free(d);
167 return(0);
168}
169
170FILE *mtstdopen(int fd, int issock, int timeout, char *mode, struct stdiofd **infop)
171{
172 struct stdiofd *d;
173 FILE *ret;
174 int r, w;
175
176 if(!strcmp(mode, "r")) {
177 r = 1; w = 0;
178 } else if(!strcmp(mode, "w")) {
179 r = 0; w = 1;
180 } else if(!strcmp(mode, "r+")) {
181 r = w = 1;
182 } else {
183 return(NULL);
184 }
185 omalloc(d);
186 d->fd = fd;
187 d->sock = issock;
188 d->timeout = timeout;
189 d->rights = d->sendrights = -1;
190 if(!(ret = funstdio(d, r?mtread:NULL, w?mtwrite:NULL, NULL, mtclose))) {
191 free(d);
192 return(NULL);
193 }
194 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
195 if(infop)
196 *infop = d;
197 return(ret);
198}
199
200struct bufio *mtbioopen(int fd, int issock, int timeout, char *mode, struct stdiofd **infop)
201{
202 static struct bufioops ops = {
203 .read = mtread, .write = mtwrite, .close = mtclose,
204 };
205 struct stdiofd *d;
206 struct bufio *ret;
207
208 if(!strcmp(mode, "r")) {
209 } else if(!strcmp(mode, "w")) {
210 } else if(!strcmp(mode, "r+")) {
211 } else {
212 return(NULL);
213 }
214 omalloc(d);
215 d->fd = fd;
216 d->sock = issock;
217 d->timeout = timeout;
218 d->rights = d->sendrights = -1;
219 if(!(ret = bioopen(d, &ops))) {
220 free(d);
221 return(NULL);
222 }
223 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
224 if(infop)
225 *infop = d;
226 return(ret);
227}
228
229struct pipe {
230 struct charbuf data;
231 size_t bufmax;
232 int closed;
233 struct muth *r, *w;
234};
235
236static void freepipe(struct pipe *p)
237{
238 buffree(p->data);
239 free(p);
240}
241
242static ssize_t piperead(void *pdata, void *buf, size_t len)
243{
244 struct pipe *p = pdata;
245 ssize_t ret;
246
247 while(p->data.d == 0) {
248 if(p->closed & 2)
249 return(0);
250 if(p->r) {
251 errno = EBUSY;
252 return(-1);
253 }
254 p->r = current;
255 yield();
256 p->r = NULL;
257 }
258 ret = min(len, p->data.d);
259 memcpy(buf, p->data.b, ret);
260 memmove(p->data.b, p->data.b + ret, p->data.d -= ret);
261 if(p->w)
262 resume(p->w, 0);
263 return(ret);
264}
265
266static int piperclose(void *pdata)
267{
268 struct pipe *p = pdata;
269
270 if(p->closed & 2) {
271 freepipe(p);
272 } else {
273 p->closed |= 1;
274 if(p->w)
275 resume(p->w, 0);
276 }
277 return(0);
278}
279
280static ssize_t pipewrite(void *pdata, const void *buf, size_t len)
281{
282 struct pipe *p = pdata;
283 ssize_t ret;
284
285 if(p->closed & 1) {
286 errno = EPIPE;
287 return(-1);
288 }
289 while(p->data.d >= p->bufmax) {
290 if(p->w) {
291 errno = EBUSY;
292 return(-1);
293 }
294 p->w = current;
295 yield();
296 p->w = NULL;
297 if(p->closed & 1) {
298 errno = EPIPE;
299 return(-1);
300 }
301 }
302 ret = min(len, p->bufmax - p->data.d);
303 sizebuf(p->data, p->data.d + ret);
304 memcpy(p->data.b + p->data.d, buf, ret);
305 p->data.d += ret;
306 if(p->r)
307 resume(p->r, 0);
308 return(ret);
309}
310
311static int pipewclose(void *pdata)
312{
313 struct pipe *p = pdata;
314
315 if(p->closed & 1) {
316 freepipe(p);
317 } else {
318 p->closed |= 2;
319 if(p->r)
320 resume(p->r, 0);
321 }
322 return(0);
323}
324
325void mtiopipe(FILE **read, FILE **write)
326{
327 struct pipe *p;
328
329 omalloc(p);
330 p->bufmax = 4096;
331 *read = funstdio(p, piperead, NULL, NULL, piperclose);
332 *write = funstdio(p, NULL, pipewrite, NULL, pipewclose);
333}