Archive

Archive for the ‘C’ Category

Multitasking: PHP in parallel

March 2nd, 2010 Nick No comments

Like most websites, glogster.com periodically sends out an e-newsletter to its user-base.    This is done by a simple PHP script that opens a socket to a mail server and goes through the user database, sending an email to each one.    This approach has worked fine until recently, the number of users has gone up dramatically and now it takes an infeasibly long time to send emails to all our users.

After optimising the script as much as possible, the only alternative option was to run many instances of the script simultaneously.   PHP doesn’t support multithreading, and although PHP does support POSIX style process control ( http://www.php.net/manual/en/book.pcntl.php ) I decided to create a wrapper in C that would handle multiple instances of PHP.   (I haven’t found anything technically wrong with PHP’s process control, I just needed something stable, reliable and potentially able to mulitask other scripts such as Python).

How it works

The wrapper I made (see below for code listing) is fairly simple and works by creating a series of threads, each of which forks and creates a PHP instance (replacing the child process) that runs the emailing script.   The thread from the parent process waits for the PHP process to finish and then ends itself.   Once a thread ends, the main thread creates another one, thereby maintaining a constant number of threads.   Signal handling is handled by a separate thread and can either tell the main thread to stop creating new threads and thereby safely shutdown the program or directly terminate all child processes and thus abruptly end the program.

Almost done

So now I had a way of running any number of PHP processes simultaneously and was almost ready to send emails at lightning speed, however one problem remained.   Once an email has been sent to a user, the database is updated so that I know which user has received which email and thus prevent me from sending the same user the same email more than once.   The problem is; what if one instance of the PHP script reads a user, and before it has time to send them the email and update the database, another PHP instance reads the same user and also sends them the email?   One way to solve this would be through locking the database table before reading and updating each row, however this would slow the database down which is not ideal.

The solution I came up with was to limit the number of simultaneous PHP instances to 8 and have the wrapper send each PHP process a unique instance ID from 0 to 7.   When selecting users from the database, the script looked at the 3 least significant bits (LSBs) of the IDs in the recipients table and selected only those rows whose 3 LSBs equated to the instance ID passed from the wrapper.

SELECT *,
(
	if( (`id` | 1) = `id`, 1, 0) +
	if( (`id` | 2) = `id`, 2, 0) +
	if( (`id` | 4) = `id`, 4, 0)
) AS `IID`
FROM `recipients`
WHERE `sent` = 0
HAVING `IID` = :IID
LIMIT :OFFSET, :BATCH_SIZE;

Just bind the instance ID passed from the wrapper into the query and each instance will now select recipients that are unique to that ID – no two concurrent instances can select the same recipients. Just don’t forget to update each row once each email has been sent.

Finishing touches

We’re almost there, just one more little feature is required and the system will be perfect (sort of). If a thread process has nothing to do then it will return immediately, so we end up with a situation when thread processes are constantly created and destroyed which is not the most efficient use of resources. The solution was to check the exit status of the PHP process. If zero then all is ok, the thread ends and is available to be restarted immediately as normal. If however, the exit status is non-zero then the thread is marked as “sleeping” and cannot be restarted by the main thread for another 30 seconds.

If the PHP script doesn’t find any rows in the database to process then it returns 250 (a non-zero, non-reserved exit status) and so the calling thread in the wrapper sleeps for 30 seconds before trying again to see if there are any new items in the database to process.

This has the added advantage that if anything goes wrong in the PHP script, such as a fatal error, then the thread will sleep and you won’t end up with perpetuate thread cycling.

Code listing

Here is the code for the wrapper. The code should be fairly straightforward, with the above notes and the inline comments you should be able to figure out what’s going on – just don’t forget to compile with the pthread library. Comments are of course very welcome!

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <sys/wait.h>

#define NUM_THREADS 8
#define THREAD_STACK_HEADROOM 1048576

pthread_mutex_t mutexSignal;
int slots[NUM_THREADS];
int handledSignal = -1;

void* signalHandler(void* arg);
void waitForThreads();
void killall();
void *task(void *i);

/**
 * Each thread will do this
 *
 */
void *task(void *i)
{
    sigset_t signalSet;
    int iid;
    char *tid;
    int stat_loc;

    iid = (int)i;

    /* Say hello and show the thread number */
    printf("Thread %d: starting\n", iid);

    /* Spawn a child to run the program. */
    pid_t pid=fork();

    switch (pid)
    {
        case 0:
            setsid();
            sigfillset(&signalSet);
            pthread_sigmask(SIG_UNBLOCK, &signalSet, NULL );

            tid = malloc(11*sizeof(char));

            sprintf(tid, "tid=%d", iid);

            char *argv[]={"php", "./sleep.php", tid, NULL};

            execv("/usr/bin/php",argv);
            free(tid);
            exit(EXIT_FAILURE); /* only if execv fails */
        case -1:
            printf("Thread %d: Fork failed\n", iid);
            slots[iid] = 0;
            break;
        default:
            /*  parent process */
            slots[iid] = pid;

            if (pid == waitpid(pid,&stat_loc,WUNTRACED)) /* wait for child to exit */
            {
                printf("Thread %d: Child finished with exit code: %d", iid, WEXITSTATUS(stat_loc));

                if (WEXITSTATUS(stat_loc) != 0)
                {
                    /* Sleep this thread, set available time to now+30s */
                    printf(" Going to sleep\n");
                    slots[iid] = -1 * (time(0) + 30);
                }
                else
                {
                    /* Free this thread slot */
                    printf(" Returning to pool\n");
                    slots[iid] = 0;
                }
            }
            else
            {
                printf("Thread %d: Bad exit\n", iid);
                slots[iid] = 0;
            }
    }

    printf("Thread %d: Finished\n", iid);

    /* Terminate the thread */
    pthread_exit((void*) i);
}

void killall()
{
    int kv,i;

    for (i=0; i<NUM_THREADS;i++)
    {
        kv = kill(slots[i], SIGINT);
        printf("Killed %d: %s\n", slots[i], kv ==0?"ok":"fail");
    }
}

void waitForThreads()
{
    int i, stoppedThreads;

    printf("Waiting for threads to finish\n");

    while(1)
    {
        stoppedThreads = 0;

        for (i=0; i<NUM_THREADS;i++)
        {
            /* Check if thread has finished or is sleeping */
            if (slots[i] == 0 || slots[i] < -1)
            {
                stoppedThreads++;
            }
        }

        if (stoppedThreads == NUM_THREADS)
        {
            break;
        }
    }
}

void* signalHandler( void* arg )
{
    sigset_t signal_set;
    int sig;

    while (1)
    {
        /* Wait for any and all signals */
        sigfillset(&signal_set);

        sigwait(&signal_set, &sig);

        /* When we get this far, we've caught a signal */

        switch(sig)
        {
            /* SIGQUIT */
            case SIGTERM:
            case SIGQUIT:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = SIGQUIT;
                pthread_mutex_unlock(&mutexSignal);
                break;

            /* SIGINT */
            case SIGINT:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = SIGINT;
                pthread_mutex_unlock(&mutexSignal);
                break;

            /* other signals */
            default:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = 0;
                pthread_mutex_unlock(&mutexSignal);
                break;
        }
    }

    return (void*)0;
}

/**
 * Main
 *
 */
int main()
{
    int rc, i;
    int running = 1;
    size_t stacksize;
    sigset_t signalSet;
    pthread_t threadSignalHandler;
    pthread_t thread[NUM_THREADS];
    pthread_attr_t attr;

    /* Initialise the signal mutex lock */
    pthread_mutex_init(&mutexSignal, NULL);

    /* Initialise and set thread attributes */
    pthread_attr_init(&attr);
    //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    /* Ensure each thread has a given stack size - portability */
    pthread_attr_getstacksize(&attr, &stacksize);
    printf("Default stack size = %li\n", (long) stacksize);

        /* Determine and set a new stack size */
        stacksize = sizeof(sigset_t) + THREAD_STACK_HEADROOM;
        printf("Amount of stack calculated per thread = %li\n", (long) stacksize);
        pthread_attr_setstacksize (&attr, stacksize);

    /* block all signals */
    sigfillset(&signalSet);
    pthread_sigmask(SIG_BLOCK, &signalSet, NULL );

    /* create the signal handling thread */
    pthread_create(&threadSignalHandler, NULL, signalHandler, NULL);

    while (running > 0)
    {
        /* Find an empty slot */
        for (i=0; i<NUM_THREADS;i++)
        {
            if (slots[i] == 0)  /* Thread is currently not doing anything, so let's give it something to do */
            {
                /* Set this slot as used, -1 is a temporary value before being replaced by the PID of the forked process (prevents race hazards) */
                slots[i] = -1;

                /* This slot is spare */
                printf("Main: creating thread %d\n", i);

                /* Create a new thread */
                rc = pthread_create(&thread[i], &attr, task, (void *)i);

                if (rc)
                {
                    printf("ERROR: return code from pthread_create() is %d\n", rc);
                    exit(1);
                }

                break;
            }
            else if (slots[i] < -1)     /* Thread is sleeping, see if we should wake it up */
            {
                if (time(0) > (-1 * slots[i]) )
                {
                    /* Thread has slept long enough so let's wake it up */
                    slots[i] = 0;
                }
            }
        }

        pthread_mutex_lock(&mutexSignal);

        switch ( handledSignal )
        {
            case -1:
                break;

            case 0:
                /* The case for signals we're not interested in */
                handledSignal = -1;
                break;

            case SIGTERM:
            case SIGQUIT:
                printf("Main: SIGQUIT\n");
                handledSignal = -1;
                running = -1;
                killall();
                break;

            case SIGINT:
                printf("Main: SIGINT\n" );
                handledSignal = -1;
                running = -1;
                break;
        }

        pthread_mutex_unlock(&mutexSignal);

        /* Sleep for a bit - all this thread creation is hard work! */
        usleep(100000);
    }

    waitForThreads();

    printf("Bye\n");
}

And here’s a sample PHP script (but you can of course have the wrapper call anything).

<?php

list($junk, $tid) = explode("=", $argv[1], 2);

printf("PHP: Hello from thread %d\n", $tid);

sleep(mt_rand(4,14));

// Non-zero exit status if the calling thread should sleep
// Chosen 250 because it's not a reserved status
// NOTE: PHP returns 255 on fatal error
exit(250);

Exercises left to the reader

The above code is not really intended as production-ready, (although I have actually used it in production) and there are still plenty of loose ends that could do with tidying up. One thing you might want to do is detach the child PHP processes from the parent process’ output sockets, thereby effectively sending them to the background. For inspiration take a look at this basic daemonising program: A simple daemon in C.

Another nice addition might be to allow the child process to be specified in command line arguments to the wrapper. It would be useful to specify the sleep time for threads and also the maximum number of concurrent threads – this way you wouldn’t have to recompile each time you changed something.

If you find this useful or interesting then comments would be greatly welcomed!

Categories: C, PHP Tags: , , , ,

A Simple Daemon in C

December 10th, 2009 Nick No comments

A daemon is a process which runs in the background of your computer, periodically carrying out a specific task. The following is an example of a simple daemon written in C. It works by forking to create a child process, the parent then terminates but the child carries on in the background – entering a continuous loop of doing a task and then sleeping. The child process is of course an identical copy of the parent so care must be taken to close all file descriptors, thus detaching the child completely from the calling process. The deamonised process is controlled by sending it signals which it can catch and take action accordingly. In the example below, the process is terminated by sending SIGINT or SIGTERM, but you can of course add in your own handling – for example to re-read config data on SIGHUP.

#include <stdio.h>
#include <signal.h>
#include <syslog.h>
#include <errno.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>

#define DAEMON_NAME "simpledaemon"

    void daemonShutdown();
    void signal_handler(int sig);
    void daemonize(char *rundir, char *pidfile);

    int pidFilehandle;

    void signal_handler(int sig)
    {
        switch(sig)
        {
            case SIGHUP:
                syslog(LOG_WARNING, "Received SIGHUP signal.");
                break;
            case SIGINT:
            case SIGTERM:
                syslog(LOG_INFO, "Daemon exiting");
                daemonShutdown();
                exit(EXIT_SUCCESS);
                break;
            default:
                syslog(LOG_WARNING, "Unhandled signal %s", strsignal(sig));
                break;
        }
    }

    void daemonShutdown()
    {
        close(pidFilehandle);
    }

    void daemonize(char *rundir, char *pidfile)
    {
        int pid, sid, i;
        char str[10];
        struct sigaction newSigAction;
        sigset_t newSigSet;

        /* Check if parent process id is set */
        if (getppid() == 1)
        {
            /* PPID exists, therefore we are already a daemon */
            return;
        }

        /* Set signal mask - signals we want to block */
        sigemptyset(&newSigSet);
        sigaddset(&newSigSet, SIGCHLD);  /* ignore child - i.e. we don't need to wait for it */
        sigaddset(&newSigSet, SIGTSTP);  /* ignore Tty stop signals */
        sigaddset(&newSigSet, SIGTTOU);  /* ignore Tty background writes */
        sigaddset(&newSigSet, SIGTTIN);  /* ignore Tty background reads */
        sigprocmask(SIG_BLOCK, &newSigSet, NULL);   /* Block the above specified signals */

        /* Set up a signal handler */
        newSigAction.sa_handler = signal_handler;
        sigemptyset(&newSigAction.sa_mask);
        newSigAction.sa_flags = 0;

            /* Signals to handle */
            sigaction(SIGHUP, &newSigAction, NULL);     /* catch hangup signal */
            sigaction(SIGTERM, &newSigAction, NULL);    /* catch term signal */
            sigaction(SIGINT, &newSigAction, NULL);     /* catch interrupt signal */

        /* Fork*/
        pid = fork();

        if (pid < 0)
        {
            /* Could not fork */
            exit(EXIT_FAILURE);
        }

        if (pid > 0)
        {
            /* Child created ok, so exit parent process */
            printf("Child process created: %d\n", pid);
            exit(EXIT_SUCCESS);
        }

        /* Child continues */

        umask(027); /* Set file permissions 750 */

        /* Get a new process group */
        sid = setsid();

        if (sid < 0)
        {
            exit(EXIT_FAILURE);
        }

        /* close all descriptors */
        for (i = getdtablesize(); i >= 0; --i)
        {
            close(i);
        }

        /* Route I/O connections */
        close(STDIN_FILENO);
        close(STDOUT_FILENO);
        close(STDERR_FILENO);

        chdir(rundir); /* change running directory */

        /* Ensure only one copy */
        pidFilehandle = open(pidfile, O_RDWR|O_CREAT, 0600);

        if (pidFilehandle == -1 )
        {
            /* Couldn't open lock file */
            syslog(LOG_INFO, "Could not open PID lock file %s, exiting", pidfile);
            exit(EXIT_FAILURE);
        }

        /* Try to lock file */
        if (lockf(pidFilehandle,F_TLOCK,0) == -1)
        {
            /* Couldn't get lock on lock file */
            syslog(LOG_INFO, "Could not lock PID lock file %s, exiting", pidfile);
            exit(EXIT_FAILURE);
        }

        /* Get and format PID */
        sprintf(str,"%d\n",getpid());

        /* write pid to lockfile */
        write(pidFilehandle, str, strlen(str));
    }

    int main()
    {
        /* Debug logging
        setlogmask(LOG_UPTO(LOG_DEBUG));
        openlog(DAEMON_NAME, LOG_CONS, LOG_USER);
        */

        /* Logging */
        setlogmask(LOG_UPTO(LOG_INFO));
        openlog(DAEMON_NAME, LOG_CONS | LOG_PERROR, LOG_USER);

        syslog(LOG_INFO, "Daemon starting up");

        /* Deamonize */
        daemonize("/tmp/", "/tmp/daemon.pid");

        syslog(LOG_INFO, "Daemon running");

        while (1)
        {
            syslog(LOG_INFO, "daemon says hello");

            sleep(1);
        }
    }

The above example is only simple but it serves as a good starting point to create your own daemon. In my next article I will demonstrate how it can be extended to periodically call a PHP script.

If I find enough time I will also demonstrate how it can be extended from being a single-threaded daemon to a multi-threaded daemon, maintaining a continuous pool of x threads, each doing a given task.

Linked Lists in C: push, pop, shift and unshift

October 22nd, 2009 Nick No comments

A sound understanding of linked lists and how to use them is a prerequisite for programming in C, or indeed most programming languages for that matter.   Most scripting languages provide built-in helper functions for dealing with arrays and linked lists, C on the other hand doesn’t – so you have to do it yourself.   Luckily though, there’s really not that much to it, but if you’re new to C then it might be quite daunting to realise you’re in the deep end with no arm bands!

There are plenty of great books and online tutorials that cover the topic very well, however personally I prefer to look at working examples and so I have created a few for those who are already aware of the theory and want to get on with programming.

All of the examples in this tutorial use a fairly simple struct for linked list nodes containing just a pointer to an integer and the next node in the list.   (You can of course change your nodes to better suit your requirements, but you will have to change the helper functions a bit to reflect this.)

It should be fairly simple to figure out what what’s going on from the inline comments, so I’ll stop waffling now and get on with the code!

Node

Here is the struct that defines each item in the list.

struct node
{
    int val;
    struct node *next;
};

Note that I have decided to store each item’s data as an integer, but you can of course choose other types so long as you alter the functions below to accommodate this type. Basically, the struct can be whatever so long as it has a pointer to the next item in the list.

Push

Creates a new node and add it onto the end of the list.

/**
 * Adds a node onto the end of the list
 *
 */
void push(struct node** item, int data)
{
    struct node *current;
    struct node *endItem;

    current = *item;

    /* See if the passed value is a node */
    if (current)
    {
        /* See if this is the last node in the list */
        if (current->next == NULL)
        {
            /* This is the last item, so create a new node to go on the end */

            /* Allocate memory for the new node */
            endItem = malloc(sizeof(struct node));

            /* Set the value */
            endItem->val = data;

            /* Set the pointer to the next item to be NULL (there isn't a next, this will be the last item) */
            endItem->next  = NULL;

            /* Set the next pointer in the last item to the new item, making the new item now the last one */
            current->next = endItem;

            return;
        }
        else
        {
            /* This is not the last item, recurse to the next one */
            return push(&current->next, data);
        }
    }

    /* There are no nodes in the list, so create a new node and it will be the first in the list */
    endItem = malloc(sizeof(struct node));

    /* Set the value */
    endItem->val = data;

    /* Set the pointer to the next item to be NULL (there isn't a next, this will be the last item) */
    endItem->next  = NULL;

    /* Set the passed pointer to point to the new node */
    *item = endItem;
}

Pop

Removes the last node from the end of the list and return its value.

/**
 * Remove the last item and return the value
 *
 */
int pop (struct node **item, int *p)
{
    struct node *current;

    current = *item;

    if (current)
    {
        /* See if the current item specifies a proceding item */
        if (current->next)
        {
            /* There is a next item, so see if it is the last one */
            if (current->next->next == NULL)
            {
                /* The next item is the last one, so get its value */
                *p = current->next->val;

                /* Free the memory reserved for the last item */
                free(current->next);

                /* Remove the pointer to the last item */
                current->next = NULL;

                return 0;
            }

            /* Move on to the next item */
            return pop(&current->next, p);
        }

        /* There aren't any more items, this is the last one and therefore also the first */
        *p = current->val;

        /* Release the memory reserved for it */
        free(*item);

        /* Set the head node to NULL */
        *item = NULL;

        return 0;
    }

    /* There are no items in the list */
    p = NULL;

    return -1;
}

Shift

Removes the first element from the list and return its value.

/**
 * Remove the first item in the list
 *
 */
int shift(struct node **head, int *p)
{
    struct node *firstNode;

    firstNode = *head;

    /* See if there is a first node */
    if (firstNode == NULL)
    {
        /* There isn't, so return -1*/
        return -1;
    }

    /* Set the passed pointer, the pointer to the first node in the list, to the next node */
    *head = firstNode->next;

    /* Get the value from the first node before we free() it */
    *p = firstNode->val;

    /* Deallocate the first node */
    free(firstNode);

    /* Return 0, all ok */
    return 0;
}

Unshift

Creates a new node and add it onto the beginning of the list.

/**
 * Add a node onto the beginning of the list
 *
 */
void unshift(struct node** head, int data)
{
    /* Declare and reserve memory for the new item */
    struct node* newNode = malloc(sizeof(struct node));

    /* Se the new node's value */
    newNode->val = data;

    /* Set the next item to be the item that was previously first */
    newNode->next = *head;

    /* Set the pointer to the first item to this new item */
    *head = newNode;
}
Things to bear in mind

If you're used to languages such as Java or PHP, you'll know there's no need to tidy up after yourself - afterall, that's what the Garbage Collector is for!   Once you're finished with a variable, just remove all references to it and the memory allocated to it will be magically freed to be use again.   In C there is no such thing, and so you have to be a bit more responsible about the mess you make and clean up afterwards, otherwise your program will run wild and the system administrator will come and bash you over the head, (probably with a large book about memory management).

You might have noticed that in the functions pop() and shift(), there is a call to free() on the node we are removing.   Free() does exactly what it says on the tin; it frees the memory reserved for the variable addressed by the the pointer it is passed.   Note that before we create a new node, we must first allocate memory for it by calling malloc().   Free() deallocates this space so that the process can re-use it again in the future.   If we didn’t call free() then we would have to keep using more and memory to store new nodes in, which could soon get out of hand if you create and delete lots of nodes.

So how do we make sure we’re not eating memory?

I made a simple function (see below) that continuously adds and then removes nodes onto and from the list – if we are correctly freeing memory then we should see that the memory used by the program when run will be stable.

int main()
{
    struct node *head;
    int i, j;

    head = NULL;

    while (1)
    {

        for (i=1;i<6;i++)
        {
            /* Add to the beginning */
            unshift(&head, i);
            printf("Unshift: %d\n", i);
        }

        for (i=6;i<11;i++)
        {
            /* Add to the end */
            push(&head, i);
            printf("Push: %d\n", i);
        }

        for (i=1;i<6;i++)
        {
            /* Remove from the end */
            if (pop(&head, &j) == 0)
            {
                printf("Pop: %d\n", j);
            }
        }

        for (i=1;i<6;i++)
        {
            /* Remove from the beginning */
            if (shift(&head, &j) == 0)
            {
                printf("Shift: %d\n", j);
            }
        }

        /* Functions in a different order */

        for (i=1;i<6;i++)
        {
            /* Add to the end */
            push(&head, i);
            printf("Push: %d\n", i);
        }

        for (i=6;i<11;i++)
        {
            /* Add to the beginning */
            unshift(&head, i);
            printf("Unshift: %d\n", i);
        }

        for (i=1;i<6;i++)
        {
            /* Remove from the beginning */
            if (shift(&head, &j) == 0)
            {
                printf("Shift: %d\n", j);
            }
        }

        for (i=1;i<6;i++)
        {
            /* Remove from the end */
            if (pop(&head, &j) == 0)
            {
                printf("Pop: %d\n", j);
            }
        }
    }

    return 0;
}

Compile:
gcc llist.c -ollist

Don’t forget rights
chmod +x llist

Run:
./llist

Now, in another terminal we can watch how much memory the program is running:
watch 'ps aux | grep llist'

Just for fun
Comment one of the free() calls from either pop() or shift(), recompile, run and watch the new program.   You should see the memory it is using steadily inrease.

Download:

Here’s a file containing all of the above functions and the test.
Linked List Tutorial v1.0