php-amqplib: Uncaught exception ‘Exception’ with message ‘Error reading data. Recevived 0 instead of expected 1 bytes’

I’ve been playing around with RabbitMQ recently, but trying to find out what caused the above error included a trip through wireshark and an attempt to dig through the source code of php-amqplib. It seems that it’s (usually) caused by a permission problem: either the wrong username / password combination as reported by some on the wide internet, or by my own issue: the authenticated user didn’t have access to the vhost I tried to associate my connection with.

You can see the active permissions for a vhost path by using rabbitmqctl:

sudo rabbitmqctl list_permissions -p /vhostname

.. or you if you’ve installed the web management plugin for rabbitmq: select Virtual Hosts in the menu, then select the vhost you want to see permissions for.

You can give a user (all out) access to the vhost by using rabbitmqctl:

sudo rabbitmqctl set_permissions -p /vhostname guest ".*" ".*" ".*"

.. or by adding the permissions through the web management interface, where you can select the user and the permission regexes for the user/vhost combination.

Gearman and Locking for Identical Jobs / Tasks

A question that came up on #gearman on freenode today was how to make sure that a task is only performed by one worker at a time (remember from our previous introduction to Gearman that a worker is the actual piece of code performing a task that has been submitted to gearmand).

I had a few naive suggestions:

Run memcache with a low timeout (add a key when the task arrives with a low timeout value, if the add fails, simply return as someone else is probably doing the task).

Add a function for each unique identification value that can be performed, and only register one worker for each function (I like the memcache solution way better…, but it’d work. at least for a bit.)

But neither of these are a good solution to the problem; luckily Brian Moon also saw the question and was quick to point out that Gearman actually has a built-in mechanism for handling de-duplication of tasks. I’ve never used it myself (only read about it a couple of times), so it’s a good thing that Brian paid attention :-)

The solution: Use gearman_job_unique (in the PHP extension this value (named $unique in the documentation) can be tacked on to the end of most methods that add tasks or perform tasks directly (such as the do* methods)) – if Gearman sees a value that there’s already a worker active for, it’ll not resubmit the task but simply return the same result when the first worker returns (unless it’s a background task, where the second call will just return – there’s no difference in a task being submitted or already being run if you’re counting on Gearman to de-duplicate your tasks).

So if you need to lock and exit, remember that Gearman has de-duplication of non-unique tasks built-in. I tend to forget.

A Gentle Introduction to Gearman and its Concepts

Gearman (an anagram for “Manager”) is a system for farming out work units to several different servers (or several processes on one server), allowing the calling code to do something completely different while the task is performed. Gearman is not intended for inter-process communication, but is a way to tell other processes that there are work available, and letting these processes (called workers) grab a piece of work for themselves.

One of the common themes that show up at the gearman IRC channel on freenode is an attempt to understand what gearman is and how everything fits together. I’ll try to explain the different concepts and what the different responsibilities of a working gearman infrastructure are. There’s also a “Getting Started” guide on the Gearman web site with a bit of example code and installation instructions, so you might want to keep that open in another tab. So here we go: a simple gearman tutorial explaining the concepts and not just throwing example code your way.

There are three core components of a gearman installation. These are a client (someone requesting a task to be performed), a worker (someone performing a task) and the server (which coordinates tasks between clients and workers). All these three components need to be running for you to be able to something useful with gearman. It’s worth noting that I’ll use name “task” for a single item to be performed, you’ll also see this named ‘function’ (which is the name of the actual function the task asks to be performed – a server offers several “functions” that a client can call). Some APIs might also refer to a “task” as a collection of functions to be called. I’ll use the first definition; a task is a call to a function on the server, together with the data for the task and a task identifier. Several subsequent tasks will call the same function.

I’ll go a bit more in detail about each of these components, but it’s important that you understand how everything is interconnected first. An exchange of messages between the different parts can be illustrated as follows:

client -> server: ask server to perform a task
server acknowledges request and assigns an identificator to the request
server -> all workers: tell workers registered for the task that there is work to be performed
worker -> server: I'll perform the task you just told us about
server -> worker: ok, go ahead, here's the information about the task.
worker -> server: here's the result of the task performed
server -> client: here's the result of the task you asked me to get someone to do for you

The idea behind the server telling all the workers that there are work available is to let the worker that responds fastest to actually get the task, as it’s assumed that this is the worker with the least load on the server it’s running on (as it responds quickly, the server is not busy doing other things). As I wrote above, the worker is the piece of code actually doing the work – the worker performs the task that a client has submitted to the gearman server.

You’ll find that most of Gearman is designed according to the same principle – keep stuff simple. The server only needs to keep track of which workers perform which functions, and then let the workers grab a task when it becomes available.

The Gearman Client

In Gearman the client is the piece of code that connects to the server and asks for a task to be performed. This can be a dynamic web page (running in python, ruby, PHP, perl or another language with a suitable Gearman library), a completely application that connects to Gearman, a worker (to submit a new task or to divide the current task into several smaller tasks to be performed by other workers) or a combination of the above. The important part is that this is simply a client – it has a task that needs to be handled, and it’ll ask the Gearman server to find someone who can perform the task.

The client can be run in synchronous (blocking) or asynchronous (non-blocking) mode. The first will make the client wait until the task has been performed by a worker (and if no worker is available, it’ll wait indefinitely or until reaching a timeout in the client), while the latter will simply fire-and-forget the task to the Gearman server (the server will confirm that the task has been received) and then go on its merry way afterwards. The Gearman server will provide a task identification value which the asynchronous client can use to query the current state of the task it asked to be performed (as long as the actual worker provide such updates).

A small example of how a client might work (using PHP):

addServer('localhost', 4730); 

$arguments = array(
    'url' => 'http://www.example.com/',
);

$client->addTaskBackground('fetchURL', json_encode($arguments));

$client->runTasks();

This will submit a request to a Gearman server running on the same machine as the script, asking for the function “fetchURL” to be run, and including an array of arguments to the function (you could simply include just the URL, but I find that this way is easier to extend in the future – and using JSON for data exchange makes the worker code more programming language independent). This code uses addTaskBackground to submit the task to be performed in an asynchronous manner. We’re not interested in the result of this task in this particular piece of code – the worker will either provide the result through other means (storing it in a database, in memcache, call an API function telling us that it’s finished) or perhaps we’re not interested in the result at all, just that we’ve attempted to perform the task. If you’re using the synchronous interface, the data returned from the worker will be returned to your code as the return value from the client.

As you can see, the client code is very, very simple. There is no actual work being performed here, we’re just telling the server that we’d like some work to be performed for us.

The Gearman Worker

The Gearman worker is where all the actual work (.. who’d guess) is performed. This is the application that receives a notice that it has to wake up and do a bit of hard work, and which actually goes out and does just that. What kind of work it does depends on what you’re using Gearman for, but a couple of use cases could be to resize an image into smaller sizes (such as thumbnails), to convert an uploaded video into another format for a specific device, sending notification emails, updating an internal search engine such a Solr and quite a few other tasks. As long as the task is not important for the application to continue running (no need for waiting for an E-mail to be delivered if you’re going to show a “Your information has been saved” message), then Gearman (and other alternative message queues) is a valid solution.

You’ll run each worker as its own process. A worker can perform several different functions (although you should (usually) stay away from multi-threading to perform them at the same time). This means starting several copies of the same worker if you want to allow for more than one worker performing a task at the same time (i.e., if you want to send 30 e-mails in parallel), you’ll start each worker as separate processes (30 workers in that case). There are several daemons and frameworks that can help you manage the number of processes available depending on server and task load, such as supervisord and GearmanManager (a PHP daemon). Another possible solution is to use screen to start several workers, which also will allow you to attach to the output of any worker at any time.

How the worker performs its work is up to the worker itself. In most cases you’ll have to write a bit of code to expose your code as a Gearman function (so that clients can submit tasks to perform that function), but this code will usually just instantiate the worker framework from the Gearman library you’re using, letting you register what functions you’ll be able to perform and attaching callbacks telling the library what part of your own code should be called when a request to perform a task arrives.

A simple example modified from the Gearman Getting Started guide:

addServer("localhost", 4730);
$worker->addFunction("fetchURL", "fetch_url");

while ($worker->work()); 

function fetch_url($job)
{
    $arguments = json_decode($job->workload());

    if (!empty($arguments['url']))
    {
        print("Fetching " . $arguments['url'] . "\n");
        return file_get_contents($arguments['url']);
    }
}

The $worker->work() method call will wait until a work arrives, then execute the callback as defined in the addFunction call. addFunction instructs the worker to tell the gearman server that this worker is able to perform any tasks calling the “fetchURL” function. The callback provided to the library (“call this PHP function (‘fetch_url’) when tasks want to call ‘fetchURL'”) will then receive the job object containing information about the job (task) to be performed. The workload() method returns the workload – the information we included in addition to which function to call in the client example. The server receives the workload from the client and then sends it to the worker together with the task information.

Since our client calls the server using the asynchronous interface it’ll not wait for the worker to return the web page contents, but by using ->do() or one of the other foreground methods in the PHP Gearman library.

The Gearman Server

The Gearman Server used is usually the C version of the server. There’s also a PERL version, but these days the C server is the one being actively developed. There’s not much to say about the server, you usually just start it and let it run by itself, doing what it was supposed to do all along.

I’ve got one simple suggestion if you’re just playing around with Gearman for the first time: start the server with the -vvv option. This will make gearmand a lot noisier, and will allow you to see clients registering themselves with the server, pinging the server and getting a bit more information about what’s happening inside the server process.

You’ll also want to provide an IP address that the gearman server should bind to – by default it binds to all interfaces, and since gearmand does not have any authentication built in by default, you don’t want to expose your server to the whole world.

Here’s an example of how we start gearmand at one of our servers:

screen -d -m -S gearmand /usr/local/sbin/gearmand -L 127.0.0.1 -p 4730 -vvv

You can drop the part related to screen if you just want to play with gearmand:

/usr/local/sbin/gearmand -L 127.0.0.1 -p 4730 -vvv

If you have gearmand in your path and not in the same location as us, drop /usr/local/sbin :-) This will bind gearmand to your localhost and use the default port (earlier the default port was something other than 4730, so we provide it just in case).

Making it all come together

The easiest way to play around with gearman is to simply open three terminal windows: one for gearmand with logging turned on, one for your worker and its output and the last window for a client sending a task request to gearmand (you can use the ‘gearman’ binary for this, just be sure to include any data in an appropriate format). As you submit a task for a function that the worker has registered, you should see it pick it up and then start processing the task as soon as possible. After a while (depending on how you’ve implemented your worker and what function it performs) the result should appear in your client.

Our production setups usually use a web application (PHP or python/django) as the client in the above scenario. The functions are usually long running tasks, such as analysing GPS paths, encoding videos and downloading files or internal web site analytics (where we just want to get things logged and not wait for the actual logging to complete). The web application submits a request to gearmand as soon as a file has been received, with a payload of the path to the file to be processed. The workers perform their function and then store the information back into the database or to disk, then usually call a web service to tell the web application that the work has been performed and any internal state can be updated to include (and show) the result of the task.

Message queues (such as Gearman) has become one of the core technologies behind many modern web applications (and non-web applications for that matter), so there’s really no reason to avoid at least playing around a bit with it and adding another possible tool to your future options.

Solr: Replication not starting?

After upgrading our Solr-servers from 1.4.1 to 4.0-trunk (to be sure we were ready for the next version), I had trouble with getting replication to start again. It worked perfectly back with 1.4.1, but after upgrading to 4.0-trunk, it simply wouldn’t start.

I had to upgrade the machines individually (to allow the current index to continue serve requests), I removed the replication and then directed all the traffic to the slave. After updating the master (which worked after actually remembering to clean out the old webapps from Tomcat and adding a few new settings) and reindexing, most of the traffic were directed to it, and the slave were upgraded to the new Solr-version. I turned on replication again, updated the configuration file with the needed settings and started the slave. Nothing happened. Weird.

Time to debug!

On any slaves there’s a “replication.properties” file in the data directory ($SOLRHOME/data) which contain information about the current replication status. This file were created, indicating that at least the replication was attempting to run. If you open the file in a text editor (or just cat it), you should be able to read a bit of meta information about the replication state.

replicationFailedAtList=1311072270004,1311072240006..
timesFailed=11

Seems like it’s trying, but for some reason it doesn’t work. First thing to check would be to grep for replication in the log on both the master and the slave, and see if there’s any requests being made at all. There might be, but the replication still doesn’t start.

Try fetching the current state yourself to see what response the master is serving. You can do this by using “GET” or “wget” or “curl” to make an HTTP request to the master Solr-server from the slave together with the URL from “masterUrl” in the requestHandler for /replication from solrconfig.xml:

GET http://example.com/solr/replication?command=indexversion

This should respond with something close to:



  
    0
    0
  
  1310994445934
  2

If “indexversion” is 0, this means that the master hasn’t triggered a replication yet, which may seem weird if you’ve just started the server and the slave doesn’t have any data at all.

The reason might be that the master has not been instructed to actually trigger a replication event (and unless a replication event has been triggered, the indexversion will be 0):


  
    commit
    startup
    optimize

If you only have “commit” in the above list, a replication event will not be triggered unless you’ve actually performed a commit after the slave has connected for the first time. If you add “startup”, the replication will also be triggered when the master starts up (so that any connecting slaves will start replicating right away).

To fix the issue without restarting any nodes, issue a single commit to the master and watch as the slaves start replicating. To issue a commit through curl:

curl http://example.com/solr/update -H "Content-Type: text/xml" --data-binary ''

Solr, Memory Usage and Dynamic Fields

One of the many great things about Solr is that it allows you to add dynamic fields – you can define a certain pattern that a field will have to follow, but it can then use any field name that matches the pattern.

We’ve been using one such dynamic field to add a sort field for our documents:

xxx_Category_Subcategory: 300

This would allow us to sort by this field to get the priority of our documents in this particular category and subcategory. A document would contain somewhere between 1 and 15 such fields. The total selection of unique field names is somewhere around 1200 across all documents.

Be small, be happy

As long as our collection were quite small (<10k documents) this scheme worked great. When our collection grew to around 500k documents, we started seeing out of memory errors quite often. At the worst rate we got an out of memory exception every 30 minutes, and had to restart the Solr server. Performance didn't suffer, but obviously we couldn't continue restarting servers until we got bored. After removing a few other possible issues (such as our stable random sort) I were rather stumped that things didn't improve. The total amount of data in our dynamic fields were rather low, somewhere around 2.5 - 3.5m integers, or possibly somewhere around 50-70MB in total. The JVM should be able to fit everything about these fields in memory and query them for the fields we're trying to find, but a heap dump of the jvm just before it hit the out of memory exception revealed that we were getting quite a few GBs of Lucene's FieldCache objects. These objects cache the value of a field for the total set of documents available in the index, and you're sadly not able to tune this cache through the Solr configuration (at least not for 1.4 as far as I could find).

Less Dynamic Fields, More Manual Labor

After pondering this issue a bit I came to the conclusion that our problem was related to the dynamic fields we had, and the fact that we used them for sorting. Lucene / Solr keeps one set of field caches for each field when it’s used for sorting, to avoid having to do duplicate work later. For us, this meant that each time we sorted a new field, an array had to be created with the size of the total document set. As long as we just had 10k documents, these arrays were small enough that we had enough memory available – when the document set grew to almost 500k documents, not so much.

This means that the total memory required for field caches will be limited by DocumentsInIndex * FieldsSortedBy. As long as our DocumentsInIndex were just 10k, the available memory to the jvm was enough to keep sorting by the number of fields we did. When the number of documents grew, the memory usage grew by the same factor and we got our OutOfMemoryException.

The Solution

Our solution could probably be more elegant, but currently we’ve moved the sorting to our application layer instead of the data provider layer. We’re requesting the complete set of hits from the Solr-server in the category anyway, so we’re able to sort it in the application – and by using a response format other than XML we’re also doing it rather quickly. This means that we’re not using sorting at all, and are only querying against one multivalued field to see if the category key is present there at all.

Note: Other solutions we considered were to divide our index into several Solr cores. This would allow us to keep the number of documents in each core low, and therefor also keep the fieldcache size in check. We know that each category could very well live on just on core as we won’t be mixing it with data from the other cores (and for that we could keep a separate core with all the documents, just not use it for searching across dynamic fields). We dropped this plan because of the rather worrying increase in complexity in our Solr installation. This could however help in your own case. :-)

One Possible Reason for Missing Munin Graphs

We’re currently expanding our munin reporting cluster at Derdubor, but after installing munin-node on one of our servers we never got any graphs. The only section available on the munin server was “Other”, and that didn’t contain any information at all (which indicates that you’re not getting any response from the server).

The first step I make when trying to debug a munin connection is to telnet into the munin port, as this confirms that the two servers are able to talk to each other and that the munin daemon listens to the correct interface and port.

# telnet localhost 4949
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Connection closed by foreign host.
#

The connection was established, but then munin closed the connection as soon as it was created. This usually means one thing: the host you’re connecting from isn’t added to the cidr_allow list or the allow list, or in the denied hosts list. This time it meant neither, the host was added and we didn’t have any denied hosts list.

The next step was to take a look at the munin-node.log in /var/log/munin (at least under under debian).

The last message was:


User "ejabberd" in configuration file "/etc/munin/plugin-conf.d/munin-node" nonexistant. Skipping plugin. at /usr/sbin/munin-node line 615, line 83.
Something wicked happened while reading "/etc/munin/plugins/munin-node". Check the previous log lines for spesifics. at /usr/sbin/munin-node line 261, line 83.

We don’t have ejabberd installed, but the ejabberd config reference was apparently added to the configuration file in /etc/munin/plugin-conf.d/munin-node. This made our version of munin-node barf, as the user it reference wasn’t available.

Next step was to remove the section from the file and restarting munin-node:

/etc/init.d/munin-node restart

After restarting munin, I did the telnet check again:

# telnet localhost 4949
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
# munin node at example.com
.
fetch load
load.value 0.02
.
quit
Connection closed by foreign host.
#

Wait 10 – 15 minutes and you should start seeing graphs again – if this actually were your problem. Probably not (and then you should probably read Debuggning Munin Plugins and other documentation on the Wiki). But if it were, you’ll be happy happy joy joy now.

Writing a Munin Plugin

I have to admit something. I’ve become addicted.

One of the things I finally got around to doing while living the quiet life over the christmas holiday was to dive a bit further into Munin – a simple framework for collecting information from your computers and servers and making nice graphs that you can watch while you’re bored.

I’m not going to write a lot about how you can create your own Munin plugin to create your own graphs, as they have a very simple tutorial giving you all the basics about writing Munin plugins themselves. The only thing you need to remember are these two tidbits:

  1. When Munin first registers your plugin, it runs your script with config as the only argument. This provides Munin with the name of the graph, the labels and names (keys) of the graphs you’re providing values for, information about the axis, etc.
  2. When Munin runs your script without the config argument, it expects you to give it values for the keys you provided it in the configuration.

You enable and disable plugins by creating symlinks in /etc/munin/plugins (at least under debian / ubuntu), and plugins are usually stored in /usr/share/munin/plugins.

I keep my plugins archived together with the rest of the repository for my web projects, and then either symlink the content into the plugins-directory or create a simple wrapper script that changes the current directory to the location of the script and then invokes it (to make the current working directory be correct).

A very simple bash script that does this – and passes through any parameters given to the script:

#!/bin/bash
cd  && php ./