Archive

Archive for the ‘PHP’ Category

Multitasking: PHP in parallel

March 2nd, 2010 Nick 1 comment

Like most websites, glogster.com periodically sends out an e-newsletter to its user-base.    This is done by a simple PHP script that opens a socket to a mail server and goes through the user database, sending an email to each one.    This approach has worked fine until recently, the number of users has gone up dramatically and now it takes an infeasibly long time to send emails to all our users.

After optimising the script as much as possible, the only alternative option was to run many instances of the script simultaneously.   PHP doesn’t support multithreading, and although PHP does support POSIX style process control ( http://www.php.net/manual/en/book.pcntl.php ) I decided to create a wrapper in C that would handle multiple instances of PHP.   (I haven’t found anything technically wrong with PHP’s process control, I just needed something stable, reliable and potentially able to mulitask other scripts such as Python).

How it works

The wrapper I made (see below for code listing) is fairly simple and works by creating a series of threads, each of which forks and creates a PHP instance (replacing the child process) that runs the emailing script.   The thread from the parent process waits for the PHP process to finish and then ends itself.   Once a thread ends, the main thread creates another one, thereby maintaining a constant number of threads.   Signal handling is handled by a separate thread and can either tell the main thread to stop creating new threads and thereby safely shutdown the program or directly terminate all child processes and thus abruptly end the program.

Almost done

So now I had a way of running any number of PHP processes simultaneously and was almost ready to send emails at lightning speed, however one problem remained.   Once an email has been sent to a user, the database is updated so that I know which user has received which email and thus prevent me from sending the same user the same email more than once.   The problem is; what if one instance of the PHP script reads a user, and before it has time to send them the email and update the database, another PHP instance reads the same user and also sends them the email?   One way to solve this would be through locking the database table before reading and updating each row, however this would slow the database down which is not ideal.

The solution I came up with was to limit the number of simultaneous PHP instances to 8 and have the wrapper send each PHP process a unique instance ID from 0 to 7.   When selecting users from the database, the script looked at the 3 least significant bits (LSBs) of the IDs in the recipients table and selected only those rows whose 3 LSBs equated to the instance ID passed from the wrapper.

SELECT *,
(
	if( (`id` | 1) = `id`, 1, 0) +
	if( (`id` | 2) = `id`, 2, 0) +
	if( (`id` | 4) = `id`, 4, 0)
) AS `IID`
FROM `recipients`
WHERE `sent` = 0
HAVING `IID` = :IID
LIMIT :OFFSET, :BATCH_SIZE;

Just bind the instance ID passed from the wrapper into the query and each instance will now select recipients that are unique to that ID – no two concurrent instances can select the same recipients. Just don’t forget to update each row once each email has been sent.

Note: Using OFFSET in a limit clause is inefficient, for an alternative: http://www.4pmp.com/2010/02/scalable-mysql-avoid-offset-for-large-tables/

Finishing touches

We’re almost there, just one more little feature is required and the system will be perfect (sort of). If a thread process has nothing to do then it will return immediately, so we end up with a situation when thread processes are constantly created and destroyed which is not the most efficient use of resources. The solution was to check the exit status of the PHP process. If zero then all is ok, the thread ends and is available to be restarted immediately as normal. If however, the exit status is non-zero then the thread is marked as “sleeping” and cannot be restarted by the main thread for another 30 seconds.

If the PHP script doesn’t find any rows in the database to process then it returns 250 (a non-zero, non-reserved exit status) and so the calling thread in the wrapper sleeps for 30 seconds before trying again to see if there are any new items in the database to process.

This has the added advantage that if anything goes wrong in the PHP script, such as a fatal error, then the thread will sleep and you won’t end up with perpetuate thread cycling.

Code listing

Here is the code for the wrapper. The code should be fairly straightforward, with the above notes and the inline comments you should be able to figure out what’s going on – just don’t forget to compile with the pthread library. Comments are of course very welcome!

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <sys/wait.h>

#define NUM_THREADS 8
#define THREAD_STACK_HEADROOM 1048576

pthread_mutex_t mutexSignal;
int slots[NUM_THREADS];
int handledSignal = -1;

void* signalHandler(void* arg);
void waitForThreads();
void killall();
void *task(void *i);

/**
 * Each thread will do this
 *
 */
void *task(void *i)
{
    sigset_t signalSet;
    int iid;
    char *tid;
    int stat_loc;

    iid = (int)i;

    /* Say hello and show the thread number */
    printf("Thread %d: starting\n", iid);

    /* Spawn a child to run the program. */
    pid_t pid=fork();

    switch (pid)
    {
        case 0:
            setsid();
            sigfillset(&signalSet);
            pthread_sigmask(SIG_UNBLOCK, &signalSet, NULL );

            tid = malloc(11*sizeof(char));

            sprintf(tid, "tid=%d", iid);

            char *argv[]={"php", "./sleep.php", tid, NULL};

            execv("/usr/bin/php",argv);
            free(tid);
            exit(EXIT_FAILURE); /* only if execv fails */
        case -1:
            printf("Thread %d: Fork failed\n", iid);
            slots[iid] = 0;
            break;
        default:
            /*  parent process */
            slots[iid] = pid;

            if (pid == waitpid(pid,&stat_loc,WUNTRACED)) /* wait for child to exit */
            {
                printf("Thread %d: Child finished with exit code: %d", iid, WEXITSTATUS(stat_loc));

                if (WEXITSTATUS(stat_loc) != 0)
                {
                    /* Sleep this thread, set available time to now+30s */
                    printf(" Going to sleep\n");
                    slots[iid] = -1 * (time(0) + 30);
                }
                else
                {
                    /* Free this thread slot */
                    printf(" Returning to pool\n");
                    slots[iid] = 0;
                }
            }
            else
            {
                printf("Thread %d: Bad exit\n", iid);
                slots[iid] = 0;
            }
    }

    printf("Thread %d: Finished\n", iid);

    /* Terminate the thread */
    pthread_exit((void*) i);
}

void killall()
{
    int kv,i;

    for (i=0; i<NUM_THREADS;i++)
    {
        kv = kill(slots[i], SIGINT);
        printf("Killed %d: %s\n", slots[i], kv ==0?"ok":"fail");
    }
}

void waitForThreads()
{
    int i, stoppedThreads;

    printf("Waiting for threads to finish\n");

    while(1)
    {
        stoppedThreads = 0;

        for (i=0; i<NUM_THREADS;i++)
        {
            /* Check if thread has finished or is sleeping */
            if (slots[i] == 0 || slots[i] < -1)
            {
                stoppedThreads++;
            }
        }

        if (stoppedThreads == NUM_THREADS)
        {
            break;
        }
    }
}

void* signalHandler( void* arg )
{
    sigset_t signal_set;
    int sig;

    while (1)
    {
        /* Wait for any and all signals */
        sigfillset(&signal_set);

        sigwait(&signal_set, &sig);

        /* When we get this far, we've caught a signal */

        switch(sig)
        {
            /* SIGQUIT */
            case SIGTERM:
            case SIGQUIT:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = SIGQUIT;
                pthread_mutex_unlock(&mutexSignal);
                break;

            /* SIGINT */
            case SIGINT:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = SIGINT;
                pthread_mutex_unlock(&mutexSignal);
                break;

            /* other signals */
            default:
                pthread_mutex_lock(&mutexSignal);
                handledSignal = 0;
                pthread_mutex_unlock(&mutexSignal);
                break;
        }
    }

    return (void*)0;
}

/**
 * Main
 *
 */
int main()
{
    int rc, i;
    int running = 1;
    size_t stacksize;
    sigset_t signalSet;
    pthread_t threadSignalHandler;
    pthread_t thread[NUM_THREADS];
    pthread_attr_t attr;

    /* Initialise the signal mutex lock */
    pthread_mutex_init(&mutexSignal, NULL);

    /* Initialise and set thread attributes */
    pthread_attr_init(&attr);
    //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    /* Ensure each thread has a given stack size - portability */
    pthread_attr_getstacksize(&attr, &stacksize);
    printf("Default stack size = %li\n", (long) stacksize);

        /* Determine and set a new stack size */
        stacksize = sizeof(sigset_t) + THREAD_STACK_HEADROOM;
        printf("Amount of stack calculated per thread = %li\n", (long) stacksize);
        pthread_attr_setstacksize (&attr, stacksize);

    /* block all signals */
    sigfillset(&signalSet);
    pthread_sigmask(SIG_BLOCK, &signalSet, NULL );

    /* create the signal handling thread */
    pthread_create(&threadSignalHandler, NULL, signalHandler, NULL);

    while (running > 0)
    {
        /* Find an empty slot */
        for (i=0; i<NUM_THREADS;i++)
        {
            if (slots[i] == 0)  /* Thread is currently not doing anything, so let's give it something to do */
            {
                /* Set this slot as used, -1 is a temporary value before being replaced by the PID of the forked process (prevents race hazards) */
                slots[i] = -1;

                /* This slot is spare */
                printf("Main: creating thread %d\n", i);

                /* Create a new thread */
                rc = pthread_create(&thread[i], &attr, task, (void *)i);

                if (rc)
                {
                    printf("ERROR: return code from pthread_create() is %d\n", rc);
                    exit(1);
                }

                break;
            }
            else if (slots[i] < -1)     /* Thread is sleeping, see if we should wake it up */
            {
                if (time(0) > (-1 * slots[i]) )
                {
                    /* Thread has slept long enough so let's wake it up */
                    slots[i] = 0;
                }
            }
        }

        pthread_mutex_lock(&mutexSignal);

        switch ( handledSignal )
        {
            case -1:
                break;

            case 0:
                /* The case for signals we're not interested in */
                handledSignal = -1;
                break;

            case SIGTERM:
            case SIGQUIT:
                printf("Main: SIGQUIT\n");
                handledSignal = -1;
                running = -1;
                killall();
                break;

            case SIGINT:
                printf("Main: SIGINT\n" );
                handledSignal = -1;
                running = -1;
                break;
        }

        pthread_mutex_unlock(&mutexSignal);

        /* Sleep for a bit - all this thread creation is hard work! */
        usleep(100000);
    }

    waitForThreads();

    printf("Bye\n");
}

And here’s a sample PHP script (but you can of course have the wrapper call anything).

<?php

list($junk, $tid) = explode("=", $argv[1], 2);

printf("PHP: Hello from thread %d\n", $tid);

sleep(mt_rand(4,14));

// Non-zero exit status if the calling thread should sleep
// Chosen 250 because it's not a reserved status
// NOTE: PHP returns 255 on fatal error
exit(250);

Exercises left to the reader

The above code is not really intended as production-ready, (although I have actually used it in production) and there are still plenty of loose ends that could do with tidying up. One thing you might want to do is detach the child PHP processes from the parent process’ output sockets, thereby effectively sending them to the background. For inspiration take a look at this basic daemonising program: A simple daemon in C.

Another nice addition might be to allow the child process to be specified in command line arguments to the wrapper. It would be useful to specify the sleep time for threads and also the maximum number of concurrent threads – this way you wouldn’t have to recompile each time you changed something.

If you find this useful or interesting then comments would be greatly welcomed!

Categories: C, PHP Tags: , , , ,

Spinning command line cursor in Java and PHP

January 29th, 2010 Nick 1 comment

Update: I have created an updated article which describes a multithreaded approach in Java Spinning command line cursor in Java

I think I must be really bored this morning, I can’t believe I’m actually blogging this, but it might be useful for someone, who knows.   Anyway, I am currently writing a program that sits and does stuff for a very long time, and I need a way to nicely indicate the program is still running and doing its stuff.   So I have created a little method that prints a spinning cursor on the command line.   The implementation would of course have to be multithreaded; one thread to do stuff, the other to spin the cursor – if there’s time I’ll update it to a more complete solution, but for now here’s the spin implementation:


public class Spin
{
    public static void main(String[] args) throws InterruptedException
    {
        String[] phases = {"|", "/", "-", "\\"};

        System.out.printf("Spinning... |");

        while (true)
        {
            for (String phase : phases)
            {
                System.out.printf("\b"+phase);
                Thread.sleep(100);
            }
        }
    }
}

And here it is in PHP.

<?php

    $phases = array("|", "/", "-", "\\");

    printf("Spinning... |");

    while (1)
    {
        foreach ($phases AS $phase)
        {
            printf('%s%s', chr(8), $phase);
            usleep(100000); // Replace this with one iteration of doing stuff
        }
    }

Of course PHP doesn’t support threading so you’d have to call each iteration of the “doing stuff” loop inside the spin loop which would mean you’d get a bit of a jumpy spinning cursor but I think most people could live with that.

Categories: Java, PHP Tags: , , , , ,

Transparent aggregation in PHP5

October 17th, 2009 Nick 3 comments

The other day a Ruby developer was extolling the virtues of Ruby on Rails to me, in particular the use of “acts_as_” to make a class magically include all the behaviour from another. It isn’t inheritance, the relationship isn’t “is a” but rather “has a” – it’s a sort of transparent aggregation with all the methods and properties of the aggregated object magically available through the containing object. At least that’s what it looks like, from my somewhat limited RoR experience.

Personally, I would rather achieve this functionality by using the Strategy Pattern – it requires more boiler-plate code but I think the end result is much clearer. If you’re reading this via o2 mobile broadband or another portable connection to the web, then prepare to miss your stop on the train, as my temptations to re-work the functionality issues began to take over just lower down the page.

Anyway, not to be outdone, I set about implementing RoR’s acts_as in PHP.   My implementation works by using a class called Aggregatable which allows all child classes to aggregate, or “act_as”, other standard classes.   I’ll start with an example with two simple classes; Binman and Postman which each do different jobs.

Here are the classes:

class Binman
{
    public function collectRubbish()
    {
        printf("Collecting rubbish\n");
    }
}
class Postman
{
    public function deliverLetter()
    {
        printf("Delivering a letter\n");
    }
}

We can use the classes like this:

$binman = new Binman();
$binman->collectRubbish();    // Prints "Collecting rubbish"

$postman = new Postman();
$postman->deliverLetter();    // Prints "Delivering a letter"

So far so good, but what if I wanted a CasualWorker class to be able to both collect rubbish and deliver letters?   I could of course copy the methods from the Binman class and the Postman class into the CasualWorker class, but this would be rather inflexible – suppose if we needed to change the Binman behaviour we would have to update both the Binman and CasualWorker classes.   Instead we make the CasualWorker class a child of the Aggregatable class and aggregate the Binman and Postman classes.   That might sound a bit complex so I’ll illustrate with an example.

Here’s the CasualWorker class:

class CasualWorker extends Aggregatable
{
    public function __construct()
    {
        $this->aggregate("Binman");
        $this->aggregate("Postman");
    }
}

The CasualWorker class can now do everything that the Binman and Postman classes can:

$casualWorker = new CasualWorker();
$casualWorker->collectRubbish();    // Prints "Collecting rubbish"
$casualWorker->deliverLetter();    // Prints "Delivering a letter"

Great!   It works!   But how?

The Aggregatable class contains an array of instances of all the classes it has been asked to aggregate.   It also makes use of PHP’s magic method, __call(), so that when client code invokes a method that is not defined by it or the child class it is extending, it loops through the objects it is aggregating to see if any of those define the method.   If an object is found that defines the requested method then it is run and the result returned – just as if the invoked class defined the method.

class Aggregatable
{
    /**
     * Store of aggregated objects
     *
     */
    private $aggregated = array();

    /**
     * Aggregates objects
     *
     * @param string Classname
     */
    protected function aggregate($class)
    {
        // Check an instance of this class has not already been aggregated
        if (array_key_exists($class, $this->aggregated))
        {
            throw new Exception(sprintf("Class already aggregated: %s", $class));
        }

        // Add a new instance of the class to the store
        $this->aggregated[$class] = new $class();
    }

    /**
     * Magic method - catch calls to undefined methods
     *
     * @param String method name
     * @param array Arguments
     */
    public function __call($method, $arguments)
    {
        // Loop through the aggregated objects
        foreach ($this->aggregated as $subject)
        {
            // Check each object to see if it defines the method
            if (method_exists($subject, (string) $method))
            {
                // Object defines the requested method, so call it and return the result
                return call_user_func_array(array($subject, (string) $method), $arguments);
            }
        }

        throw new Exception(sprintf("Method not found: %s", $method));
    }
}

What about properties?

I’ve kept the example above fairly simple but there’s no reason why it couldn’t be extended to cover properties using the other Magic Functions __set(), __get() and __unset().   In fact, in the example in the download (see below) I’ve implemented basic support for properties.

Conclusion

Like I’ve said before, I’m not a big fan of this sort of “magic”.   It seems to me to break the golden rule of object oriented programmed, as mentioned in the Gang Of Four book; “program to an interface, not an implementation”.   The interface the class provides is defined by the public properties and methods available, or even better, from the Interfaces it implements.   By adding this sort of run-time dynamism, the client code can never be sure what the class can do, instanceof won’t help so you’re left with either making extra functions like bool isAggregating(classname) or reflection.   Admittedly magic is nice, and even if a isAggregating() method were to solve all the problems, I would strongly advise to stay away from such temptations.

To quote Matt Zandstra,

Magic is nice but clarity is much nicer.

Downloads

aggregation.zip
Here’s a zip file containing all the classes mentioned above plus support for properties.   It’s perhaps worth mentioning that this really isn’t production-ready, should you ever really want to use such a thing it will need a few tweaks first – namely how to handle adding two classes which define methods of the same name – which one has priority?   No doubt you’ll also want some inspection methods such as isAggregating() as previously mentioned.

PEAR Cache_Lite – efficient group cleaning

September 27th, 2009 Nick No comments

After using PEAR Cache_Lite for a while, we began to notice that as traffic increased the web servers spent more and more time thrashing their discs. On closer inspection we noticed that the servers were pretty much constantly parsing the entire cache directory structure.

Whenever you call Cache_Lite::clean() to remove a group of cached elements, it parses the entire cache directory structure looking for cache files which have the correct group hash in the filename. This was problematic for us because we stored a lot of data in groups, for example with messages – each page of messages for each user was stored in one group. Whenever someone sends a message, the system then deletes the cache group containing the recipient’s messages. As the cache directory structure increased in size, it took longer and longer to parse, and with increasing traffic the web servers were soon doing nothing but parsing the cache directory.

The solution I came up with was to prepend the name of each group with a number which was also cached. So when a request arrives for a cached item in group “messages”, the cache system looked for the cached group identifier number and prepended it to the group name, resulting in an internal group name like “1234_messages”.

The overhead is an extra cache “get”, but the advantage is that in order to expire a whole group you just have to increment the identifier number by one, (get, increment, save). So when the group is accessed again, the internal group name becomes “1235_messages”, which is not yet set, and so the application can regenerate the cache.

In my opinion this additional “get” is a price worth paying, especially as it’s a relatively very quick operation, and the time saved expiring a group is many times faster.

Finally

You might be thinking to yourself, “what about all those expired cache files just left on the disc?”. Well, we set a CRON job to run every day and delete all files older than 3 days. As none of the caches lasted longer than three days this was a safe duration.

In fact, we don’t use disc caching anywhere near as much as we did, now we use Memcached for most things, but for small and often used caches (such as IDs) we still use the disc cache as it’s by far the fastest.

PEAR Cache_Lite – preventing stampeding

September 3rd, 2009 Nick 1 comment

The PEAR Cache_Lite package is an excellent caching system; lightweight and fast, however when put into use on a high-traffic website a few issues came to light. The first problem we hit was stampeding.

What’s stampeding?

Stampeding is the situation when a request, let’s say from User1, arrives for a cached item that has expired. The cache system returns boolean false and the process of rebuilding that cached data begins, calling the database, formatting the data and so on.

If, during this process of rebuilding the cached data, another request arrives for the same cached item, let’s say from User2, another process of rebuilding the cached data begins. This is because the process started by User1 has not yet finished and so the cache system still returns boolean false when requested for the cached item.

So now we have two processes running, regenerating the same cache item. The situation can get out of hand if more and more requests for the same cache item arrive – causing the load on the web server or database to increase, and everything to potentially grind to a halt.

The solution

What’s required is for the cache system to know that a particular cache is being regenerated and therefore return the old cache until the new data has been regenerated. Thankfully this can be achieved very simply with the addition of just one extra line of code into the Cache_Lite class, Lite.php.

The trick is to touch() the cache file immediately after realising it has expired in the Cache_Lite::get() function. After touching the file, the get function will return false and the calling code will regenerate the cache data.

@touch($this->_file);

By touching the file, the modification time of the cache file is set to the current time and therefore all subsequent requests will think the cache is still valid and return the old data. Once the first process has regenerated the data, it saves it and the cache file once again contains up-to-date data.

Some finishing touches

By touching the cache, processes immediately following the one which is regenerating the fresh data will return out-of-date data, albeit by a matter of seconds – which in most cases really won’t matter nor be noticed.

If, however, something were to happen to the process regenerating the data, such as an uncaught exception, database timeout, etc that it would fail and not save the cache, then the old cache will be valid until it expires again – so it will have effectively been valid for twice its intended lifetime.

We can limit this by setting the modification time in the touch command to be the current time, minus the cache lifetime, plus 60s – which would mean that if the regenerating process were to fail, the cache would only be valid for another 60s.

@touch($this->_file, time() - abs($this->_lifeTime) + 60);