Multitasking: PHP in parallel

Update: I have recently solved the issue of multitasking in PHP with a standalone application, The Fat Controller, which handles multitasking for you. Read more here.

Like most websites, glogster.com periodically sends out an e-newsletter to its user-base.    This is done by a simple PHP script that opens a socket to a mail server and goes through the user database, sending an email to each one.    This approach has worked fine until recently, the number of users has gone up dramatically and now it takes an infeasibly long time to send emails to all our users.

After optimising the script as much as possible, the only alternative option was to run many instances of the script simultaneously.   PHP doesn’t support multithreading, and although PHP does support POSIX style process control ( http://www.php.net/manual/en/book.pcntl.php ) I decided to create a wrapper in C that would handle multiple instances of PHP.   (I haven’t found anything technically wrong with PHP’s process control, I just needed something stable, reliable and potentially able to mulitask other scripts such as Python).

How it works

The wrapper I made (see below for code listing) is fairly simple and works by creating a series of threads, each of which forks and creates a PHP instance (replacing the child process) that runs the emailing script.   The thread from the parent process waits for the PHP process to finish and then ends itself.   Once a thread ends, the main thread creates another one, thereby maintaining a constant number of threads.   Signal handling is handled by a separate thread and can either tell the main thread to stop creating new threads and thereby safely shutdown the program or directly terminate all child processes and thus abruptly end the program.

Almost done

So now I had a way of running any number of PHP processes simultaneously and was almost ready to send emails at lightning speed, however one problem remained.   Once an email has been sent to a user, the database is updated so that I know which user has received which email and thus prevent me from sending the same user the same email more than once.   The problem is; what if one instance of the PHP script reads a user, and before it has time to send them the email and update the database, another PHP instance reads the same user and also sends them the email?   One way to solve this would be through locking the database table before reading and updating each row, however this would slow the database down which is not ideal.

The solution I came up with was to limit the number of simultaneous PHP instances to 8 and have the wrapper send each PHP process a unique instance ID from 0 to 7.   When selecting users from the database, the script looked at the 3 least significant bits (LSBs) of the IDs in the recipients table and selected only those rows whose 3 LSBs equated to the instance ID passed from the wrapper.

SELECT *,
(
	if( (`id` | 1) = `id`, 1, 0) +
	if( (`id` | 2) = `id`, 2, 0) +
	if( (`id` | 4) = `id`, 4, 0)
) AS `IID`
FROM `recipients`
WHERE `sent` = 0
HAVING `IID` = :IID
LIMIT :OFFSET, :BATCH_SIZE;

Just bind the instance ID passed from the wrapper into the query and each instance will now select recipients that are unique to that ID – no two concurrent instances can select the same recipients. Just don’t forget to update each row once each email has been sent.

Note: Using OFFSET in a limit clause is inefficient, for an alternative: http://www.4pmp.com/2010/02/scalable-mysql-avoid-offset-for-large-tables/

Finishing touches

We’re almost there, just one more little feature is required and the system will be perfect (sort of). If a thread process has nothing to do then it will return immediately, so we end up with a situation when thread processes are constantly created and destroyed which is not the most efficient use of resources. The solution was to check the exit status of the PHP process. If zero then all is ok, the thread ends and is available to be restarted immediately as normal. If however, the exit status is non-zero then the thread is marked as “sleeping” and cannot be restarted by the main thread for another 30 seconds.

If the PHP script doesn’t find any rows in the database to process then it returns 250 (a non-zero, non-reserved exit status) and so the calling thread in the wrapper sleeps for 30 seconds before trying again to see if there are any new items in the database to process.

This has the added advantage that if anything goes wrong in the PHP script, such as a fatal error, then the thread will sleep and you won’t end up with perpetuate thread cycling.

Code listing

Here is the code for the wrapper. The code should be fairly straightforward, with the above notes and the inline comments you should be able to figure out what’s going on – just don’t forget to compile with the pthread library. Comments are of course very welcome!

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <sys/wait.h>


#define NUM_THREADS 8
#define THREAD_STACK_HEADROOM 1048576

pthread_mutex_t mutexSignal;
int slots[NUM_THREADS];
int handledSignal = -1;

void* signalHandler(void* arg);
void waitForThreads();
void killall();
void *task(void *i);


/**
 * Each thread will do this
 *
 */
void *task(void *i)
{
    sigset_t signalSet;
    int iid;
    char *tid;
    int stat_loc;
    
    iid = (int)i;

    /* Say hello and show the thread number */
    printf("Thread %d: starting\n", iid);
    
    /* Spawn a child to run the program. */
    pid_t pid=fork();
    
    switch (pid)
    {
        case 0:
            setsid();
            sigfillset(&signalSet);
            pthread_sigmask(SIG_UNBLOCK, &signalSet, NULL );
        
            tid = malloc(11*sizeof(char));
            
            sprintf(tid, "tid=%d", iid);
            
            char *argv[]={"php", "./sleep.php", tid, NULL};
            
            execv("/usr/bin/php",argv);
            free(tid);
            exit(EXIT_FAILURE); /* only if execv fails */
        case -1:
            printf("Thread %d: Fork failed\n", iid);
            slots[iid] = 0;
            break;
        default:
            /*  parent process */
            slots[iid] = pid;
            
            if (pid == waitpid(pid,&stat_loc,WUNTRACED)) /* wait for child to exit */
            {
                printf("Thread %d: Child finished with exit code: %d", iid, WEXITSTATUS(stat_loc));
                
                if (WEXITSTATUS(stat_loc) != 0)
                {
                    /* Sleep this thread, set available time to now+30s */
                    printf(" Going to sleep\n");
                    slots[iid] = -1 * (time(0) + 30);
                }
                else
                {
                    /* Free this thread slot */
                    printf(" Returning to pool\n");
                    slots[iid] = 0;
                }
            }
            else
            {
                printf("Thread %d: Bad exit\n", iid);
                slots[iid] = 0;
            }
    }


    printf("Thread %d: Finished\n", iid);

    /* Terminate the thread */
    pthread_exit((void*) i);
}

void killall()
{
    int kv,i;
    
    for (i=0; i<NUM_THREADS;i++)
    {
        kv = kill(slots[i], SIGINT);
        printf("Killed %d: %s\n", slots[i], kv ==0?"ok":"fail");
    }
}

void waitForThreads()
{
    int i, stoppedThreads;
    
    printf("Waiting for threads to finish\n");
    
    while(1)
    {
        stoppedThreads = 0;
        
        for (i=0; i<NUM_THREADS;i++)
        {
            /* Check if thread has finished or is sleeping */
            if (slots[i] == 0 || slots[i] < -1)
            {
                stoppedThreads++;
            }
        }
        
        if (stoppedThreads == NUM_THREADS)
        {
            break;
        }
    }
}

void* signalHandler( void* arg )
{
    sigset_t signal_set;
    int sig;

    while (1)
    {
        /* Wait for any and all signals */
        sigfillset(&signal_set);
        
        sigwait(&signal_set, &sig);

        /* When we get this far, we've caught a signal */

        switch(sig)
        {
            /* SIGQUIT */
            case SIGTERM:
            case SIGQUIT:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = SIGQUIT;
                pthread_mutex_unlock(&mutexSignal);
                break;

            /* SIGINT */
            case SIGINT:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = SIGINT;
                pthread_mutex_unlock(&mutexSignal);
                break;

            /* other signals */
            default:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = 0;
                pthread_mutex_unlock(&mutexSignal);
                break;
        }
    }
    
    return (void*)0;
}


/**
 * Main
 *
 */
int main()
{
    int rc, i;
    int running = 1;
    size_t stacksize;
    sigset_t signalSet;
    pthread_t threadSignalHandler;
    pthread_t thread[NUM_THREADS];
    pthread_attr_t attr;

    /* Initialise the signal mutex lock */
    pthread_mutex_init(&mutexSignal, NULL);
    
    /* Initialise and set thread attributes */
    pthread_attr_init(&attr);
    //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    
    /* Ensure each thread has a given stack size - portability */
    pthread_attr_getstacksize(&attr, &stacksize);
    printf("Default stack size = %li\n", (long) stacksize);

        /* Determine and set a new stack size */
        stacksize = sizeof(sigset_t) + THREAD_STACK_HEADROOM;
        printf("Amount of stack calculated per thread = %li\n", (long) stacksize);
        pthread_attr_setstacksize (&attr, stacksize);


    /* block all signals */
    sigfillset(&signalSet);
    pthread_sigmask(SIG_BLOCK, &signalSet, NULL );

    /* create the signal handling thread */
    pthread_create(&threadSignalHandler, NULL, signalHandler, NULL);

    while (running > 0)
    {
        /* Find an empty slot */
        for (i=0; i<NUM_THREADS;i++)
        {
            if (slots[i] == 0)  /* Thread is currently not doing anything, so let's give it something to do */
            {
                /* Set this slot as used, -1 is a temporary value before being replaced by the PID of the forked process (prevents race hazards) */
                slots[i] = -1;
                
                /* This slot is spare */
                printf("Main: creating thread %d\n", i);
                
                /* Create a new thread */
                rc = pthread_create(&thread[i], &attr, task, (void *)i);

                if (rc)
                {
                    printf("ERROR: return code from pthread_create() is %d\n", rc);
                    exit(1);
                }
                
                break;
            }
            else if (slots[i] < -1)     /* Thread is sleeping, see if we should wake it up */
            {
                if (time(0) > (-1 * slots[i]) )
                {
                    /* Thread has slept long enough so let's wake it up */
                    slots[i] = 0;
                }
            }
        }
        
        pthread_mutex_lock(&mutexSignal);

        switch ( handledSignal )
        {
            case -1:
                break;

            case 0:
                /* The case for signals we're not interested in */
                handledSignal = -1;
                break;

            case SIGTERM:
            case SIGQUIT:
                printf("Main: SIGQUIT\n");
                handledSignal = -1;
                running = -1;
                killall();
                break;

            case SIGINT:
                printf("Main: SIGINT\n" );
                handledSignal = -1;
                running = -1;
                break;
        }
        
        pthread_mutex_unlock(&mutexSignal);
        
        /* Sleep for a bit - all this thread creation is hard work! */
        usleep(100000);
    }

    waitForThreads();

    printf("Bye\n");
} 

And here’s a sample PHP script (but you can of course have the wrapper call anything).

<?php

list($junk, $tid) = explode("=", $argv[1], 2);

printf("PHP: Hello from thread %d\n", $tid);

sleep(mt_rand(4,14));

// Non-zero exit status if the calling thread should sleep
// Chosen 250 because it's not a reserved status
// NOTE: PHP returns 255 on fatal error
exit(250);

Exercises left to the reader

The above code is not really intended as production-ready, (although I have actually used it in production) and there are still plenty of loose ends that could do with tidying up. One thing you might want to do is detach the child PHP processes from the parent process’ output sockets, thereby effectively sending them to the background. For inspiration take a look at this basic daemonising program: A simple daemon in C.

Another nice addition might be to allow the child process to be specified in command line arguments to the wrapper. It would be useful to specify the sleep time for threads and also the maximum number of concurrent threads – this way you wouldn’t have to recompile each time you changed something.

If you find this useful or interesting then comments would be greatly welcomed!

This entry was posted in C, PHP and tagged , , , , . Bookmark the permalink.

One Response to Multitasking: PHP in parallel

  1. Andrej says:

    Hi Nick,

    very nice article.
    Sending a lot of emails is always a big issue. It’s not a big programming problem. As you wrote, multithreading will decrease time of execution.
    Problem is, that you will get banned(or flaged) very soon by most major mail services(gmail,yahoo,…).
    Solution is to buy many ip’s from big providers, like theplanet or iweb. Beware of amzaon ip’s, there are all flaged(it is really big issue with amazon, you can not even setup regular email server there, because all email outgoing from theirs ip’s are flaged as spam.)
    You pay about $1 per ip per month.
    Once you got 100+ ip’s you can easly send as many email as you want.


    #for i in 10 ; do php run_nicks_script.php ; done

    Or something like that :)

    Keep posting articles like this ! :)
    Andrej

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>