--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/dpoke.c Sat Mar 16 00:03:51 2013 +0400
@@ -0,0 +1,250 @@
+#define _POSIX_C_SOURCE 199309L
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <unistd.h>
+#include <poll.h>
+#include <time.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <math.h>
+
+
+enum DPokeSourceType_en {
+ DPOKE_PROGRAM,
+ DPOKE_FILE,
+ DPOKE_FUNCTION,
+};
+typedef enum DPokeSourceType_en DPokeSourceType;
+
+#define DPOKE_BUFFER 32
+struct DPokeSource_st {
+ DPokeSourceType type;
+ char *path;
+ int arg;
+
+ //FILE *stream;
+ int fd;
+ enum DPokeSourceStatus_en {
+ DPOKE_STARTED,
+ DPOKE_RUNNING,
+ DPOKE_DIED,
+ DPOKE_FAIL,
+ DPOKE_FAILED,
+ } status;
+ char buffer[DPOKE_BUFFER+1];
+ char current[DPOKE_BUFFER+1];
+ int buffer_usage;
+
+ // DPOKE_PROGRAM-specific fields:
+ int pid;
+};
+
+typedef struct DPokeSource_st DPokeSource;
+
+#define LENGTH(X) (sizeof X / sizeof X[0])
+
+char* smprintf(char *fmt, ...) {
+ va_list fmtargs;
+ char *ret;
+ int len;
+
+ va_start(fmtargs, fmt);
+ len = vsnprintf(NULL, 0, fmt, fmtargs);
+ va_end(fmtargs);
+
+ ret = malloc(++len);
+ if (ret == NULL) {
+ perror("malloc");
+ exit(1);
+ }
+
+ va_start(fmtargs, fmt);
+ vsnprintf(ret, len, fmt, fmtargs);
+ va_end(fmtargs);
+
+ return ret;
+}
+
+
+void display();
+
+extern char* v[];
+#include "config.h"
+#ifdef DEBUG
+#define errprintf(...) fprintf(stderr,...)
+#else
+#define errprintf(...)
+#endif
+
+char* v[LENGTH(sources)];
+
+static struct pollfd source_fds[LENGTH(sources)];
+
+void source_open(int source_id) {
+ DPokeSource *src = &sources[source_id];
+ //FILE* stream;
+ int p_stdout[2];
+ switch (src->type) {
+ case DPOKE_PROGRAM:
+ //stream = popen(src->path, "r");
+ if (pipe(p_stdout)!=0) {
+ perror("pipe");
+ exit(7);
+ }
+ pid_t pid = fork();
+ if (pid < 0) {
+ perror("fork");
+ exit(7);
+ } else if (pid == 0) {
+ dup2(open("/dev/null",O_RDONLY),0);
+ close(p_stdout[0]);
+ dup2(p_stdout[1],1);
+ execl("/bin/sh", "sh", "-c", src->path, NULL);
+ perror("execl");
+ exit(1);
+ }
+ src->fd = p_stdout[0];
+ src->pid = pid;
+ break;
+ default:
+ errprintf("Don't know how to handle source type %d.\n",src->type);
+ exit(2);
+ }
+ //src->stream = stream;
+ //src->fd = fileno(stream);
+ src->buffer_usage = 0;
+ src->status = DPOKE_STARTED;
+ source_fds[source_id].fd = sources[source_id].fd;
+ source_fds[source_id].events = POLLIN;
+}
+
+static void sigchld_hdl (int sig)
+{
+ pid_t pid;
+ while ((pid = waitpid(-1, NULL, WNOHANG)) > 0) {
+ for (int i=0;i<LENGTH(sources);i++)
+ if ((sources[i].type == DPOKE_PROGRAM) && (sources[i].pid == pid)) {
+ sources[i].pid = -1;
+ errprintf("#%d pid %d died.\n",i,pid);
+ switch (sources[i].status) {
+ case DPOKE_STARTED:
+ sources[i].status = DPOKE_FAIL; break;
+ case DPOKE_RUNNING:
+ sources[i].status = DPOKE_DIED; break;
+ default:
+ errprintf("#%d died in state %d. Whaddafuck?!\n",i,sources[i].status);
+ exit(8);
+ }
+ }
+ }
+}
+
+void source_close(int source_id) {
+ DPokeSource *src = &sources[source_id];
+ if (src->pid != -1) kill(src->pid,SIGTERM);
+ if (src->fd != -1) close(src->fd);
+ source_fds[source_id].fd = -1;
+}
+
+double dpoke_time() {
+ struct timespec tp;
+ if (clock_gettime(CLOCK_MONOTONIC,&tp)) {
+ perror("clock_gettime");
+ exit(5);
+ }
+ return tp.tv_sec + tp.tv_nsec*1E-9;
+}
+
+int main(int argc, char* argv[]) {
+ struct sigaction act;
+
+ memset (&act, 0, sizeof(act));
+ act.sa_handler = sigchld_hdl;
+ if (sigaction(SIGCHLD, &act, 0)) {
+ perror ("sigaction");
+ exit(6);
+ }
+
+ for (int i = 0; i < LENGTH(sources); i++ ) {
+ source_open(i);
+ v[i] = sources[i].current;
+ }
+ int status;
+ int sleeptime = 10000;
+ double prevtime=dpoke_time();
+ while (1) {
+ status = poll(source_fds,LENGTH(sources), sleeptime);
+ if (status < 0) {
+ if (errno == EINTR)
+ continue;
+ }
+ for (int i = 0; i < LENGTH(sources); i++) {
+ DPokeSource *src = sources+i;
+ if (source_fds[i].revents & POLLIN) {
+ int data_read = read(src->fd,src->buffer+src->buffer_usage,DPOKE_BUFFER-src->buffer_usage);
+ if (data_read>0) {
+ src->buffer_usage += data_read;
+ if (src->buffer_usage==DPOKE_BUFFER) {
+ errprintf("Buffer is full for #%d.\n",i);
+ }
+ src->buffer[src->buffer_usage] = '\0';
+ char* eolpos = strchr(src->buffer + src->buffer_usage - data_read,'\n');
+ if (eolpos) {
+ memcpy(src->current,src->buffer,eolpos-src->buffer);
+ src->current[eolpos-src->buffer] = '\0';
+ src->buffer_usage -= (eolpos - src->buffer + 1);
+ memmove(src->buffer,eolpos + 1,src->buffer_usage + 1);
+ }
+ if (src->status == DPOKE_STARTED)
+ src->status = DPOKE_RUNNING;
+ } else {
+ perror("read");
+ exit(3);
+ }
+ } else if ((source_fds[i].revents & POLLHUP)||(src->status==DPOKE_DIED)) {
+ errprintf("#%d HUP\n",i);
+ source_close(i);
+ if (src->status==DPOKE_STARTED)
+ src->status = DPOKE_FAIL;
+ else
+ source_open(i);
+ } else if (source_fds[i].revents) {
+ errprintf("#%d revents: %d\n",i,source_fds[i].revents);
+ }
+ if (src->status==DPOKE_FAIL) {
+ src->status = DPOKE_FAILED;
+ errprintf("#%d Marked as failure\n",i);
+ strcpy(src->current,FAILURE_MSG);
+ }
+ }
+ double curtime = dpoke_time();
+ if ((curtime-prevtime)>MINTIME) {
+ display();
+ prevtime = curtime;
+ sleeptime = 10000;
+ } else {
+ double sleepd = (prevtime+MINTIME - curtime)*1000;
+ sleeptime = ceil(sleepd);
+ errprintf("%f %f %f %f\n",prevtime,curtime,MINTIME,sleepd);
+ }
+ errprintf("Will sleep for %d\n",sleeptime);
+ }
+ return 0;
+}
+/*
+ABCnDEF0
+01234567
+eolpos = 3
+bu = 7
+3-0+1 = 4
+bu = 3
+memmove:
+01234567
+34534567
+nDEn
+*/