#define _POSIX_C_SOURCE 199309L
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
#include <poll.h>
#include <time.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>
#include <math.h>
enum DPokeSourceType_en {
DPOKE_PROGRAM,
DPOKE_FILE,
DPOKE_FUNCTION,
};
typedef enum DPokeSourceType_en DPokeSourceType;
#define DPOKE_BUFFER 128
struct DPokeSource_st {
DPokeSourceType type;
char *path;
int arg;
//FILE *stream;
int fd;
enum DPokeSourceStatus_en {
DPOKE_STARTED,
DPOKE_RUNNING,
DPOKE_DIED,
DPOKE_FAIL,
DPOKE_FAILED,
} status;
char buffer[DPOKE_BUFFER+1];
char current[DPOKE_BUFFER+1];
int buffer_usage;
// DPOKE_PROGRAM-specific fields:
int pid;
};
typedef struct DPokeSource_st DPokeSource;
#define LENGTH(X) (sizeof X / sizeof X[0])
char *
smprintf(char *fmt, ...)
{
va_list fmtargs;
char *ret;
int len;
va_start(fmtargs, fmt);
len = vsnprintf(NULL, 0, fmt, fmtargs);
va_end(fmtargs);
ret = malloc(++len);
if (ret == NULL) {
perror("malloc");
exit(1);
}
va_start(fmtargs, fmt);
vsnprintf(ret, len, fmt, fmtargs);
va_end(fmtargs);
return ret;
}
void display(char *v[], size_t vc);
void setup(int argc, char **argv);
void cleanup(int exitcode);
extern char* v[];
#include "config.h"
#ifdef DEBUG
#define errprintf(...) fprintf(stderr,__VA_ARGS__)
#else
#define errprintf(...)
#endif
char* v[LENGTH(sources)];
static struct pollfd source_fds[LENGTH(sources)];
void die(int exitcode) {
fprintf(stderr, "Dying with exit code %d\n", exitcode);
cleanup(exitcode);
exit(exitcode);
}
void source_open(int source_id) {
DPokeSource *src = &sources[source_id];
//FILE* stream;
int p_stdout[2];
switch (src->type) {
case DPOKE_PROGRAM:
//stream = popen(src->path, "r");
if (pipe(p_stdout)!=0) {
perror("pipe");
die(7);
}
pid_t pid = fork();
if (pid < 0) {
perror("fork");
die(7);
} else if (pid == 0) {
dup2(open("/dev/null",O_RDONLY),0);
close(p_stdout[0]);
dup2(p_stdout[1],1);
execl("/bin/sh", "sh", "-c", src->path, NULL);
perror("execl");
exit(1);
}
src->fd = p_stdout[0];
src->pid = pid;
break;
default:
errprintf("Don't know how to handle source type %d.\n",src->type);
die(2);
}
//src->stream = stream;
//src->fd = fileno(stream);
src->buffer_usage = 0;
src->status = DPOKE_STARTED;
source_fds[source_id].fd = sources[source_id].fd;
source_fds[source_id].events = POLLIN;
}
static void sigchld_hdl (int sig)
{
pid_t pid;
while ((pid = waitpid(-1, NULL, WNOHANG)) > 0) {
for (int i=0;i<LENGTH(sources);i++)
if ((sources[i].type == DPOKE_PROGRAM) && (sources[i].pid == pid)) {
sources[i].pid = -1;
errprintf("#%d pid %d died.\n",i,pid);
switch (sources[i].status) {
case DPOKE_STARTED:
sources[i].status = DPOKE_FAIL; break;
case DPOKE_RUNNING:
sources[i].status = DPOKE_DIED; break;
default:
errprintf("#%d died in state %d. Whaddafuck?!\n",i,sources[i].status);
die(8);
}
}
}
}
void source_close(int source_id) {
DPokeSource *src = &sources[source_id];
if (src->pid != -1) kill(src->pid,SIGTERM);
if (src->fd != -1) close(src->fd);
source_fds[source_id].fd = -1;
}
double dpoke_time() {
struct timespec tp;
if (clock_gettime(CLOCK_MONOTONIC,&tp)) {
perror("clock_gettime");
die(5);
}
return tp.tv_sec + tp.tv_nsec*1E-9;
}
int main(int argc, char* argv[]) {
setup(argc, argv);
struct sigaction act;
memset (&act, 0, sizeof(act));
act.sa_handler = sigchld_hdl;
if (sigaction(SIGCHLD, &act, 0)) {
perror ("sigaction");
die(6);
}
for (int i = 0; i < LENGTH(sources); i++ ) {
source_open(i);
v[i] = sources[i].current;
}
int status;
int sleeptime = 10000;
double prevtime=dpoke_time();
while (1) {
status = poll(source_fds,LENGTH(sources), sleeptime);
if (status < 0) {
if (errno == EINTR)
continue;
}
for (int i = 0; i < LENGTH(sources); i++) {
DPokeSource *src = sources+i;
if (source_fds[i].revents & POLLIN) {
errprintf("Event: %d\n", source_fds[i].revents);
int data_read = read(src->fd,src->buffer+src->buffer_usage,DPOKE_BUFFER-src->buffer_usage);
if (data_read>0) {
src->buffer_usage += data_read;
if (src->buffer_usage>=DPOKE_BUFFER) {
fprintf(stderr, "Buffer is full for #%d. Resetting.\n",i);
src->buffer_usage = 0;
}
src->buffer[src->buffer_usage] = '\0';
char* eolpos = strchr(src->buffer + src->buffer_usage - data_read,'\n');
if (eolpos) {
memcpy(src->current,src->buffer,eolpos-src->buffer);
src->current[eolpos-src->buffer] = '\0';
src->buffer_usage -= (eolpos - src->buffer + 1);
memmove(src->buffer,eolpos + 1,src->buffer_usage + 1);
}
if (src->status == DPOKE_STARTED)
src->status = DPOKE_RUNNING;
} else {
fprintf(stderr, "Buffer usage: %d\n", src->buffer_usage);
perror("read");
die(3);
}
} else if ((source_fds[i].revents & POLLHUP)||(src->status==DPOKE_DIED)) {
errprintf("#%d HUP\n",i);
source_close(i);
if (src->status==DPOKE_STARTED)
src->status = DPOKE_FAIL;
else
source_open(i);
} else if (source_fds[i].revents) {
errprintf("#%d revents: %d\n",i,source_fds[i].revents);
}
if (src->status==DPOKE_FAIL) {
src->status = DPOKE_FAILED;
errprintf("#%d Marked as failure\n",i);
strcpy(src->current,FAILURE_MSG);
}
}
double curtime = dpoke_time();
if ((curtime-prevtime)>MINTIME) {
display(v,LENGTH(sources));
prevtime = curtime;
sleeptime = 10000;
} else {
double sleepd = (prevtime+MINTIME - curtime)*1000;
sleeptime = ceil(sleepd);
errprintf("%f %f %f %f\n",prevtime,curtime,MINTIME,sleepd);
}
errprintf("Will sleep for %d\n",sleeptime);
}
cleanup(0);
return 0;
}
/*
ABCnDEF0
01234567
eolpos = 3
bu = 7
3-0+1 = 4
bu = 3
memmove:
01234567
34534567
nDEn
*/