diff -r 000000000000 -r a22a319f5129 dpoke.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dpoke.c Sat Mar 16 00:03:51 2013 +0400 @@ -0,0 +1,250 @@ +#define _POSIX_C_SOURCE 199309L +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +enum DPokeSourceType_en { + DPOKE_PROGRAM, + DPOKE_FILE, + DPOKE_FUNCTION, +}; +typedef enum DPokeSourceType_en DPokeSourceType; + +#define DPOKE_BUFFER 32 +struct DPokeSource_st { + DPokeSourceType type; + char *path; + int arg; + + //FILE *stream; + int fd; + enum DPokeSourceStatus_en { + DPOKE_STARTED, + DPOKE_RUNNING, + DPOKE_DIED, + DPOKE_FAIL, + DPOKE_FAILED, + } status; + char buffer[DPOKE_BUFFER+1]; + char current[DPOKE_BUFFER+1]; + int buffer_usage; + + // DPOKE_PROGRAM-specific fields: + int pid; +}; + +typedef struct DPokeSource_st DPokeSource; + +#define LENGTH(X) (sizeof X / sizeof X[0]) + +char* smprintf(char *fmt, ...) { + va_list fmtargs; + char *ret; + int len; + + va_start(fmtargs, fmt); + len = vsnprintf(NULL, 0, fmt, fmtargs); + va_end(fmtargs); + + ret = malloc(++len); + if (ret == NULL) { + perror("malloc"); + exit(1); + } + + va_start(fmtargs, fmt); + vsnprintf(ret, len, fmt, fmtargs); + va_end(fmtargs); + + return ret; +} + + +void display(); + +extern char* v[]; +#include "config.h" +#ifdef DEBUG +#define errprintf(...) fprintf(stderr,...) +#else +#define errprintf(...) +#endif + +char* v[LENGTH(sources)]; + +static struct pollfd source_fds[LENGTH(sources)]; + +void source_open(int source_id) { + DPokeSource *src = &sources[source_id]; + //FILE* stream; + int p_stdout[2]; + switch (src->type) { + case DPOKE_PROGRAM: + //stream = popen(src->path, "r"); + if (pipe(p_stdout)!=0) { + perror("pipe"); + exit(7); + } + pid_t pid = fork(); + if (pid < 0) { + perror("fork"); + exit(7); + } else if (pid == 0) { + dup2(open("/dev/null",O_RDONLY),0); + close(p_stdout[0]); + dup2(p_stdout[1],1); + execl("/bin/sh", "sh", "-c", src->path, NULL); + perror("execl"); + exit(1); + } + src->fd = p_stdout[0]; + src->pid = pid; + break; + default: + errprintf("Don't know how to handle source type %d.\n",src->type); + exit(2); + } + //src->stream = stream; + //src->fd = fileno(stream); + src->buffer_usage = 0; + src->status = DPOKE_STARTED; + source_fds[source_id].fd = sources[source_id].fd; + source_fds[source_id].events = POLLIN; +} + +static void sigchld_hdl (int sig) +{ + pid_t pid; + while ((pid = waitpid(-1, NULL, WNOHANG)) > 0) { + for (int i=0;ipid != -1) kill(src->pid,SIGTERM); + if (src->fd != -1) close(src->fd); + source_fds[source_id].fd = -1; +} + +double dpoke_time() { + struct timespec tp; + if (clock_gettime(CLOCK_MONOTONIC,&tp)) { + perror("clock_gettime"); + exit(5); + } + return tp.tv_sec + tp.tv_nsec*1E-9; +} + +int main(int argc, char* argv[]) { + struct sigaction act; + + memset (&act, 0, sizeof(act)); + act.sa_handler = sigchld_hdl; + if (sigaction(SIGCHLD, &act, 0)) { + perror ("sigaction"); + exit(6); + } + + for (int i = 0; i < LENGTH(sources); i++ ) { + source_open(i); + v[i] = sources[i].current; + } + int status; + int sleeptime = 10000; + double prevtime=dpoke_time(); + while (1) { + status = poll(source_fds,LENGTH(sources), sleeptime); + if (status < 0) { + if (errno == EINTR) + continue; + } + for (int i = 0; i < LENGTH(sources); i++) { + DPokeSource *src = sources+i; + if (source_fds[i].revents & POLLIN) { + int data_read = read(src->fd,src->buffer+src->buffer_usage,DPOKE_BUFFER-src->buffer_usage); + if (data_read>0) { + src->buffer_usage += data_read; + if (src->buffer_usage==DPOKE_BUFFER) { + errprintf("Buffer is full for #%d.\n",i); + } + src->buffer[src->buffer_usage] = '\0'; + char* eolpos = strchr(src->buffer + src->buffer_usage - data_read,'\n'); + if (eolpos) { + memcpy(src->current,src->buffer,eolpos-src->buffer); + src->current[eolpos-src->buffer] = '\0'; + src->buffer_usage -= (eolpos - src->buffer + 1); + memmove(src->buffer,eolpos + 1,src->buffer_usage + 1); + } + if (src->status == DPOKE_STARTED) + src->status = DPOKE_RUNNING; + } else { + perror("read"); + exit(3); + } + } else if ((source_fds[i].revents & POLLHUP)||(src->status==DPOKE_DIED)) { + errprintf("#%d HUP\n",i); + source_close(i); + if (src->status==DPOKE_STARTED) + src->status = DPOKE_FAIL; + else + source_open(i); + } else if (source_fds[i].revents) { + errprintf("#%d revents: %d\n",i,source_fds[i].revents); + } + if (src->status==DPOKE_FAIL) { + src->status = DPOKE_FAILED; + errprintf("#%d Marked as failure\n",i); + strcpy(src->current,FAILURE_MSG); + } + } + double curtime = dpoke_time(); + if ((curtime-prevtime)>MINTIME) { + display(); + prevtime = curtime; + sleeptime = 10000; + } else { + double sleepd = (prevtime+MINTIME - curtime)*1000; + sleeptime = ceil(sleepd); + errprintf("%f %f %f %f\n",prevtime,curtime,MINTIME,sleepd); + } + errprintf("Will sleep for %d\n",sleeptime); + } + return 0; +} +/* +ABCnDEF0 +01234567 +eolpos = 3 +bu = 7 +3-0+1 = 4 +bu = 3 +memmove: +01234567 +34534567 +nDEn +*/