Use epoll() and signalfd() in ifchd.

This commit is contained in:
Nicholas J. Kain 2010-12-01 12:22:08 -05:00
parent fb56a9cb2b
commit f2b4527179
2 changed files with 156 additions and 116 deletions

View File

@ -1,3 +0,0 @@
write documentation...
support BSD

View File

@ -1,5 +1,5 @@
/* ifchd.c - interface change daemon /* ifchd.c - interface change daemon
* Time-stamp: <2010-11-13 08:07:54 njk> * Time-stamp: <2010-12-01 12:19:39 njk>
* *
* (C) 2004-2010 Nicholas J. Kain <njkain at gmail dot com> * (C) 2004-2010 Nicholas J. Kain <njkain at gmail dot com>
* *
@ -26,6 +26,8 @@
#include <sys/un.h> #include <sys/un.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
@ -71,7 +73,9 @@ enum states {
STATE_WINS STATE_WINS
}; };
static volatile sig_atomic_t pending_exit; static int epollfd, signalFd;
/* Extra two event slots are for signalFd and the listen socket. */
static struct epoll_event events[SOCK_QUEUE+2];
/* Socket fd, current state, and idle time for connections. */ /* Socket fd, current state, and idle time for connections. */
static int sks[SOCK_QUEUE], state[SOCK_QUEUE], idle_time[SOCK_QUEUE]; static int sks[SOCK_QUEUE], state[SOCK_QUEUE], idle_time[SOCK_QUEUE];
@ -102,23 +106,6 @@ static pid_t peer_pid;
static strlist_t *namesvrs[SOCK_QUEUE]; static strlist_t *namesvrs[SOCK_QUEUE];
static strlist_t *domains[SOCK_QUEUE]; static strlist_t *domains[SOCK_QUEUE];
static void sighandler(int sig) {
pending_exit = 1;
}
static void fix_signals(void) {
disable_signal(SIGPIPE);
disable_signal(SIGUSR1);
disable_signal(SIGUSR2);
disable_signal(SIGTSTP);
disable_signal(SIGTTIN);
disable_signal(SIGCHLD);
disable_signal(SIGHUP);
hook_signal(SIGINT, sighandler, 0);
hook_signal(SIGTERM, sighandler, 0);
}
static void die_nulstr(strlist_t *p) static void die_nulstr(strlist_t *p)
{ {
if (!p) if (!p)
@ -133,6 +120,29 @@ static void writeordie(int fd, const char *buf, int len)
suicide("write returned error"); suicide("write returned error");
} }
static void epoll_add(int fd)
{
struct epoll_event ev;
int r;
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
ev.data.fd = fd;
r = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
if (r == -1)
suicide("epoll_add failed %s", strerror(errno));
}
static void epoll_del(int fd)
{
struct epoll_event ev;
int r;
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
ev.data.fd = fd;
r = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
if (r == -1)
suicide("epoll_del failed %s", strerror(errno));
}
/* Abstracts away the details of accept()ing a socket connection. */
/* Writes out each element in a strlist as an argument to a keyword in /* Writes out each element in a strlist as an argument to a keyword in
* a file. */ * a file. */
static void write_resolve_list(const char *keyword, strlist_t *list) static void write_resolve_list(const char *keyword, strlist_t *list)
@ -300,6 +310,7 @@ static void add_sk(int sk)
for (i = 0; i < SOCK_QUEUE; i++) for (i = 0; i < SOCK_QUEUE; i++)
if (sks[i] == -1) { if (sks[i] == -1) {
new_sk(i, sk); new_sk(i, sk);
epoll_add(sk);
return; return;
} }
} }
@ -315,6 +326,7 @@ static void close_idle_sk(void)
if (sks[i] == -1) if (sks[i] == -1)
continue; continue;
if (time(NULL) - idle_time[i] > CONN_TIMEOUT) { if (time(NULL) - idle_time[i] > CONN_TIMEOUT) {
epoll_del(sks[i]);
close(sks[i]); close(sks[i]);
new_sk(i, -1); new_sk(i, -1);
} }
@ -532,7 +544,6 @@ static int get_listen(void)
return lsock; return lsock;
} }
/* Abstracts away the details of accept()ing a socket connection. */
static void accept_conns(int *lsock) static void accept_conns(int *lsock)
{ {
int ret; int ret;
@ -567,8 +578,12 @@ static void accept_conns(int *lsock)
case ENOTSOCK: case ENOTSOCK:
case EINVAL: case EINVAL:
log_line("warning: accept returned %s!\n", strerror(errno)); log_line("warning: accept returned %s!\n", strerror(errno));
epoll_del(*lsock);
close(*lsock); close(*lsock);
*lsock = get_listen(); *lsock = get_listen();
epoll_add(*lsock);
return; return;
case ECONNABORTED: case ECONNABORTED:
@ -585,110 +600,138 @@ static void accept_conns(int *lsock)
} }
} }
/* Core function that handles connections, gathers input, and calls static void setup_signals()
* the state machine to do actual work. */ {
static void dispatch_work(void) sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGPIPE);
sigaddset(&mask, SIGUSR1);
sigaddset(&mask, SIGUSR2);
sigaddset(&mask, SIGTSTP);
sigaddset(&mask, SIGTTIN);
sigaddset(&mask, SIGCHLD);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
if (sigprocmask(SIG_BLOCK, &mask, NULL) < 0)
suicide("sigprocmask failed");
signalFd = signalfd(-1, &mask, SFD_NONBLOCK);
if (signalFd < 0)
suicide("signalfd failed");
}
static void signal_dispatch()
{
int t, off = 0;
struct signalfd_siginfo si;
again:
t = read(signalFd, (char *)&si + off, sizeof si - off);
if (t < sizeof si - off) {
if (t < 0) {
if (t == EAGAIN || t == EWOULDBLOCK || t == EINTR)
goto again;
else
suicide("signalfd read error");
}
off += t;
}
switch (si.ssi_signo) {
case SIGINT:
case SIGTERM:
exit(EXIT_SUCCESS);
default:
break;
}
}
static void process_client_fd(int fd)
{ {
int lsock, ret, highfd, i, index;
fd_set rfds;
char buf[MAX_BUF]; char buf[MAX_BUF];
int r, index, sqidx = -1;
/* Initialize all structures to blank state. */ for (int j = 0; j < SOCK_QUEUE; ++j) {
for (i=0; i<SOCK_QUEUE; i++) if (sks[j] == fd) {
sks[i] = -1; sqidx = j;
initialize_if_data();
lsock = get_listen();
for (;;) {
FD_ZERO(&rfds);
FD_SET(lsock, &rfds);
/* find highest fd */
for (i=0, highfd=0; i<SOCK_QUEUE; i++) {
if (sks[i] != -1) {
FD_SET(sks[i], &rfds);
if (sks[i] > highfd)
highfd = sks[i];
}
}
if (lsock > highfd)
highfd = lsock;
ret = select(highfd + 1, &rfds, NULL, NULL, NULL);
switch (ret) {
case 0:
close_idle_sk();
break;
case -1:
if (pending_exit == 1)
return;
suicide("dispatch_work - select returned an error!");
break; break;
} }
if (pending_exit == 1)
return;
/* handle pending connections */
if (FD_ISSET(lsock, &rfds))
accept_conns(&lsock);
/* Read in and process data on waiting connections */
for (i=0; i<SOCK_QUEUE; i++) {
if (sks[i] == -1)
continue;
if (!FD_ISSET(sks[i], &rfds))
continue;
idle_time[i] = time(NULL);
memset(buf, '\0', sizeof(buf));
read_again:
ret = (int) read(sks[i], buf, MAX_BUF / 2 - 1);
/* Check to see if peer closed socket */
if (ret == 0) {
close(sks[i]);
new_sk(i, -1);
continue;
} }
if (sqidx == -1)
suicide("epoll returned pending read for untracked fd");
if (ret == -1) { idle_time[sqidx] = time(NULL);
if (errno == EINTR) memset(buf, '\0', sizeof buf);
goto read_again;
log_line("dispatch_work: read returned %s.\n", strerror(errno)); r = safe_read(sks[sqidx], buf, sizeof buf / 2 - 1);
close(sks[i]); if (r <= 0) {
new_sk(i, -1); if (r != 0)
continue; log_line("error reading from client fd: %s", strerror(errno));
goto fail;
} }
/* Discard everything and close connection if we risk overflow. /* Discard everything and close connection if we risk overflow.
* This approach is maximally conservative... worst case is that * This approach is maximally conservative... worst case is that
* some client requests will get dropped. */ * some client requests will get dropped. */
index = strlen(ibuf[i]); index = strlen(ibuf[sqidx]);
if (index + strlen(buf) > MAX_BUF - 2) { if (index + strlen(buf) > sizeof buf - 2)
close(sks[i]); goto fail;
new_sk(i, -1);
continue;
}
/* Append new stream input avoiding overflow. */ /* Append new stream input avoiding overflow. */
strlcpy(ibuf[i] + index, buf, sizeof(ibuf[i]) - index); strlcpy(ibuf[sqidx] + index, buf, sizeof ibuf[sqidx] - index);
/* Decompose ibuf contents onto strlist. */ /* Decompose ibuf contents onto strlist. */
index = stream_onto_list(i); index = stream_onto_list(sqidx);
/* Remove everything that we've parsed into the list. */ /* Remove everything that we've parsed into the list. */
strlcpy(buf, ibuf[i] + index, sizeof(buf)); strlcpy(buf, ibuf[sqidx] + index, sizeof buf);
strlcpy(ibuf[i], buf, sizeof(ibuf[i])); strlcpy(ibuf[sqidx], buf, sizeof ibuf[sqidx]);
/* Now we have a strlist of commands and arguments. /* Now we have a strlist of commands and arguments.
* Decompose and execute it. */ * Decompose and execute it. */
if (!head[i]) if (!head[sqidx])
return;
curl[sqidx] = head[sqidx];
execute_list(sqidx);
return;
fail:
epoll_del(sks[sqidx]);
close(sks[sqidx]);
new_sk(sqidx, -1);
}
/* Core function that handles connections, gathers input, and calls
* the state machine to do actual work. */
static void dispatch_work(void)
{
int lsock;
/* Initialize all structures to blank state. */
for (int i = 0; i < SOCK_QUEUE; i++)
sks[i] = -1;
initialize_if_data();
lsock = get_listen();
epollfd = epoll_create1(0);
if (epollfd == -1)
suicide("epoll_create1 failed");
epoll_add(lsock);
epoll_add(signalFd);
for (;;) {
int r = epoll_wait(epollfd, events, SOCK_QUEUE + 2, -1);
if (r == -1) {
if (errno == EINTR)
continue; continue;
curl[i] = head[i]; else
execute_list(i); suicide("epoll_wait failed");
}
for (int i = 0; i < r; ++i) {
int fd = events[i].data.fd;
if (fd == lsock)
accept_conns(&lsock);
else if (fd == signalFd)
signal_dispatch();
else
process_client_fd(fd);
} }
close_idle_sk(); close_idle_sk();
} }
@ -875,7 +918,7 @@ int main(int argc, char** argv) {
write_pid(pidfile); write_pid(pidfile);
umask(077); umask(077);
fix_signals(); setup_signals();
/* If we are requested to update resolv.conf, preopen the fd before /* If we are requested to update resolv.conf, preopen the fd before
* we drop root privileges, making sure that if we create * we drop root privileges, making sure that if we create