dpoke.c
author Stiletto <blasux@blasux.ru>
Sat, 17 Aug 2019 15:27:14 +0400
changeset 19 bf780d158f3e
parent 1 3e9290bf7249
permissions -rw-r--r--
Pass argv to setup()

#define _POSIX_C_SOURCE 199309L
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
#include <poll.h>
#include <time.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>
#include <math.h>


enum DPokeSourceType_en {
    DPOKE_PROGRAM,
    DPOKE_FILE,
    DPOKE_FUNCTION,
};
typedef enum DPokeSourceType_en DPokeSourceType;

#define DPOKE_BUFFER 128
struct DPokeSource_st {
    DPokeSourceType type;
    char *path;
    int arg;

    //FILE *stream;
    int fd;
    enum DPokeSourceStatus_en {
        DPOKE_STARTED,
        DPOKE_RUNNING,
        DPOKE_DIED,
        DPOKE_FAIL,
        DPOKE_FAILED,
    } status;
    char buffer[DPOKE_BUFFER+1];
    char current[DPOKE_BUFFER+1];
    int buffer_usage;

    // DPOKE_PROGRAM-specific fields:
    int pid;
};

typedef struct DPokeSource_st DPokeSource;

#define LENGTH(X)               (sizeof X / sizeof X[0])

char *
smprintf(char *fmt, ...)
{
        va_list fmtargs;
        char *ret;
        int len;

        va_start(fmtargs, fmt);
        len = vsnprintf(NULL, 0, fmt, fmtargs);
        va_end(fmtargs);

        ret = malloc(++len);
        if (ret == NULL) {
                perror("malloc");
                exit(1);
        }

        va_start(fmtargs, fmt);
        vsnprintf(ret, len, fmt, fmtargs);
        va_end(fmtargs);

        return ret;
}

void display(char *v[], size_t vc);
void setup(int argc, char **argv);
void cleanup(int exitcode);

extern char* v[];
#include "config.h"
#ifdef DEBUG
#define errprintf(...) fprintf(stderr,__VA_ARGS__)
#else
#define errprintf(...)
#endif

char* v[LENGTH(sources)];

static struct pollfd source_fds[LENGTH(sources)];

void die(int exitcode) {
    fprintf(stderr, "Dying with exit code %d\n", exitcode);
    cleanup(exitcode);
    exit(exitcode);
}

void source_open(int source_id) {
    DPokeSource *src = &sources[source_id];
    //FILE* stream;
    int p_stdout[2];
    switch (src->type) {
        case DPOKE_PROGRAM:
            //stream = popen(src->path, "r");
            if (pipe(p_stdout)!=0) {
                perror("pipe");
                die(7);
            }
            pid_t pid = fork();
            if (pid < 0) {
                perror("fork");
                die(7);
            } else if (pid == 0) {
                dup2(open("/dev/null",O_RDONLY),0);
                close(p_stdout[0]);
                dup2(p_stdout[1],1);
                execl("/bin/sh", "sh", "-c", src->path, NULL);
                perror("execl");
                exit(1);
            }
            src->fd = p_stdout[0];
            src->pid = pid;
            break;
        default:
            errprintf("Don't know how to handle source type %d.\n",src->type);
            die(2);
    }
    //src->stream = stream;
    //src->fd = fileno(stream);
    src->buffer_usage = 0;
    src->status = DPOKE_STARTED;
    source_fds[source_id].fd = sources[source_id].fd;
    source_fds[source_id].events = POLLIN;
}

static void sigchld_hdl (int sig)
{
    pid_t pid;
    while ((pid = waitpid(-1, NULL, WNOHANG)) > 0) {
        for (int i=0;i<LENGTH(sources);i++)
            if ((sources[i].type == DPOKE_PROGRAM) && (sources[i].pid == pid)) {
                sources[i].pid = -1;
                errprintf("#%d pid %d died.\n",i,pid);
                switch (sources[i].status) {
                    case DPOKE_STARTED:
                        sources[i].status = DPOKE_FAIL; break;
                    case DPOKE_RUNNING:
                        sources[i].status = DPOKE_DIED; break;
                    default:
                        errprintf("#%d died in state %d. Whaddafuck?!\n",i,sources[i].status);
                        die(8);
                }
            }
    }
}

void source_close(int source_id) {
    DPokeSource *src = &sources[source_id];
    if (src->pid != -1) kill(src->pid,SIGTERM);
    if (src->fd != -1) close(src->fd);
    source_fds[source_id].fd = -1;
}

double dpoke_time() {
    struct timespec tp;
    if (clock_gettime(CLOCK_MONOTONIC,&tp)) {
        perror("clock_gettime");
        die(5);
    }
    return tp.tv_sec + tp.tv_nsec*1E-9;
}

int main(int argc, char* argv[]) {
    setup(argc, argv);
    struct sigaction act;

    memset (&act, 0, sizeof(act));
    act.sa_handler = sigchld_hdl;
    if (sigaction(SIGCHLD, &act, 0)) {
        perror ("sigaction");
        die(6);
    }

    for (int i = 0; i < LENGTH(sources); i++ ) {
        source_open(i);
        v[i] = sources[i].current;
    }
    int status;
    int sleeptime = 10000;
    double prevtime=dpoke_time();
    while (1) {
        status = poll(source_fds,LENGTH(sources), sleeptime);
        if (status < 0) {
            if (errno == EINTR)
                continue;
        }
        for (int i = 0; i < LENGTH(sources); i++) {
            DPokeSource *src = sources+i;
            if (source_fds[i].revents & POLLIN) {
                errprintf("Event: %d\n", source_fds[i].revents);
                int data_read = read(src->fd,src->buffer+src->buffer_usage,DPOKE_BUFFER-src->buffer_usage);
                if (data_read>0) {
                    src->buffer_usage += data_read;
                    if (src->buffer_usage>=DPOKE_BUFFER) {
                        fprintf(stderr, "Buffer is full for #%d. Resetting.\n",i);
                        src->buffer_usage = 0;
                    }
                    src->buffer[src->buffer_usage] = '\0';
                    char* eolpos = strchr(src->buffer + src->buffer_usage - data_read,'\n');
                    if (eolpos) {
                        memcpy(src->current,src->buffer,eolpos-src->buffer);
                        src->current[eolpos-src->buffer] = '\0';
                        src->buffer_usage -= (eolpos - src->buffer + 1);
                        memmove(src->buffer,eolpos + 1,src->buffer_usage + 1);
                    }
                    if (src->status == DPOKE_STARTED)
                        src->status = DPOKE_RUNNING;
                } else {
                    fprintf(stderr, "Buffer usage: %d\n", src->buffer_usage);
                    perror("read");
                    die(3);
                }
            } else if ((source_fds[i].revents & POLLHUP)||(src->status==DPOKE_DIED)) {
                errprintf("#%d HUP\n",i);
                source_close(i);
                if (src->status==DPOKE_STARTED)
                    src->status = DPOKE_FAIL;
                else
                    source_open(i);
            } else if (source_fds[i].revents) {
                errprintf("#%d revents: %d\n",i,source_fds[i].revents);
            }
            if (src->status==DPOKE_FAIL) {
                src->status = DPOKE_FAILED;
                errprintf("#%d Marked as failure\n",i);
                strcpy(src->current,FAILURE_MSG);
            }
        }
        double curtime = dpoke_time();
        if ((curtime-prevtime)>MINTIME) {
            display(v,LENGTH(sources));
            prevtime = curtime;
            sleeptime = 10000;
        } else {
            double sleepd = (prevtime+MINTIME - curtime)*1000;
            sleeptime = ceil(sleepd);
            errprintf("%f %f %f %f\n",prevtime,curtime,MINTIME,sleepd);
        }
        errprintf("Will sleep for %d\n",sleeptime);
    }
    cleanup(0);
    return 0;
}
/*
ABCnDEF0
01234567
eolpos = 3
bu = 7
3-0+1 = 4
bu = 3
memmove:
01234567
34534567
nDEn
*/