MessageChannel.h

00001 /*
00002  *  Phusion Passenger - http://www.modrails.com/
00003  *  Copyright (C) 2008  Phusion
00004  *
00005  *  Phusion Passenger is a trademark of Hongli Lai & Ninh Bui.
00006  *
00007  *  This program is free software; you can redistribute it and/or modify
00008  *  it under the terms of the GNU General Public License as published by
00009  *  the Free Software Foundation; version 2 of the License.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License along
00017  *  with this program; if not, write to the Free Software Foundation, Inc.,
00018  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00019  */
00020 #ifndef _PASSENGER_MESSAGE_CHANNEL_H_
00021 #define _PASSENGER_MESSAGE_CHANNEL_H_
00022 
00023 #include <algorithm>
00024 #include <string>
00025 #include <list>
00026 #include <vector>
00027 
00028 #include <sys/types.h>
00029 #include <sys/socket.h>
00030 #include <arpa/inet.h>
00031 #include <errno.h>
00032 #include <unistd.h>
00033 #include <cstdarg>
00034 
00035 #include "System.h"
00036 #include "Exceptions.h"
00037 #include "Utils.h"
00038 
00039 namespace Passenger {
00040 
00041 using namespace std;
00042 
00043 /**
00044  * Convenience class for I/O operations on file descriptors.
00045  *
00046  * This class provides convenience methods for:
00047  *  - sending and receiving raw data over a file descriptor.
00048  *  - sending and receiving messages over a file descriptor.
00049  *  - file descriptor passing over a Unix socket.
00050  * All of these methods use exceptions for error reporting.
00051  *
00052  * There are two kinds of messages:
00053  *  - Array messages. These are just a list of strings, and the message
00054  *    itself has a specific length. The contained strings may not
00055  *    contain NUL characters (<tt>'\\0'</tt>). Note that an array message
00056  *    must have at least one element.
00057  *  - Scalar messages. These are byte strings which may contain arbitrary
00058  *    binary data. Scalar messages also have a specific length.
00059  * The protocol is designed to be low overhead, easy to implement and
00060  * easy to parse.
00061  *
00062  * MessageChannel is to be wrapped around a file descriptor. For example:
00063  * @code
00064  *    int p[2];
00065  *    pipe(p);
00066  *    MessageChannel channel1(p[0]);
00067  *    MessageChannel channel2(p[1]);
00068  *    
00069  *    // Send an array message.
00070  *    channel2.write("hello", "world !!", NULL);
00071  *    list<string> args;
00072  *    channel1.read(args);    // args now contains { "hello", "world !!" }
00073  *
00074  *    // Send a scalar message.
00075  *    channel2.writeScalar("some long string which can contain arbitrary binary data");
00076  *    string str;
00077  *    channel1.readScalar(str);
00078  * @endcode
00079  *
00080  * The life time of a MessageChannel is independent from that of the
00081  * wrapped file descriptor. If a MessageChannel object is destroyed,
00082  * the file descriptor is not automatically closed. Call close()
00083  * if you want to close the file descriptor.
00084  *
00085  * @note I/O operations are not buffered.
00086  * @note Be careful with mixing the sending/receiving of array messages,
00087  *    scalar messages and file descriptors. If you send a collection of any
00088  *    of these in a specific order, then the receiving side must receive them
00089  *    in the exact some order. So suppose you first send a message, then a
00090  *    file descriptor, then a scalar, then the receiving side must first
00091  *    receive a message, then a file descriptor, then a scalar. If the
00092  *    receiving side does things in the wrong order then bad things will
00093  *    happen.
00094  * @note MessageChannel is not thread-safe, but is reentrant.
00095  *
00096  * @ingroup Support
00097  */
00098 class MessageChannel {
00099 private:
00100         const static char DELIMITER = '\0';
00101         int fd;
00102 
00103 public:
00104         /**
00105          * Construct a new MessageChannel with no underlying file descriptor.
00106          * Thus the resulting MessageChannel object will not be usable.
00107          * This constructor exists to allow one to declare an "empty"
00108          * MessageChannel variable which is to be initialized later.
00109          */
00110         MessageChannel() {
00111                 this->fd = -1;
00112         }
00113 
00114         /**
00115          * Construct a new MessageChannel with the given file descriptor.
00116          */
00117         MessageChannel(int fd) {
00118                 this->fd = fd;
00119         }
00120         
00121         /**
00122          * Close the underlying file descriptor. If this method is called multiple
00123          * times, the file descriptor will only be closed the first time.
00124          *
00125          * @throw SystemException
00126          * @throw boost::thread_interrupted
00127          */
00128         void close() {
00129                 if (fd != -1) {
00130                         int ret = InterruptableCalls::close(fd);
00131                         if (ret == -1) {
00132                                 throw SystemException("Cannot close file descriptor", errno);
00133                         }
00134                         fd = -1;
00135                 }
00136         }
00137 
00138         /**
00139          * Send an array message, which consists of the given elements, over the underlying
00140          * file descriptor.
00141          *
00142          * @param args The message elements.
00143          * @throws SystemException An error occured while writing the data to the file descriptor.
00144          * @throws boost::thread_interrupted
00145          * @pre None of the message elements may contain a NUL character (<tt>'\\0'</tt>).
00146          * @see read(), write(const char *, ...)
00147          */
00148         void write(const list<string> &args) {
00149                 list<string>::const_iterator it;
00150                 string data;
00151                 uint16_t dataSize = 0;
00152 
00153                 for (it = args.begin(); it != args.end(); it++) {
00154                         dataSize += it->size() + 1;
00155                 }
00156                 data.reserve(dataSize + sizeof(dataSize));
00157                 dataSize = htons(dataSize);
00158                 data.append((const char *) &dataSize, sizeof(dataSize));
00159                 for (it = args.begin(); it != args.end(); it++) {
00160                         data.append(*it);
00161                         data.append(1, DELIMITER);
00162                 }
00163                 
00164                 writeRaw(data);
00165         }
00166         
00167         /**
00168          * Send an array message, which consists of the given strings, over the underlying
00169          * file descriptor.
00170          *
00171          * @param name The first element of the message to send.
00172          * @param ... Other elements of the message. These *must* be strings, i.e. of type char*.
00173          *            It is also required to terminate this list with a NULL.
00174          * @throws SystemException An error occured while writing the data to the file descriptor.
00175          * @throws boost::thread_interrupted
00176          * @pre None of the message elements may contain a NUL character (<tt>'\\0'</tt>).
00177          * @see read(), write(const list<string> &)
00178          */
00179         void write(const char *name, ...) {
00180                 list<string> args;
00181                 args.push_back(name);
00182                 
00183                 va_list ap;
00184                 va_start(ap, name);
00185                 while (true) {
00186                         const char *arg = va_arg(ap, const char *);
00187                         if (arg == NULL) {
00188                                 break;
00189                         } else {
00190                                 args.push_back(arg);
00191                         }
00192                 }
00193                 va_end(ap);
00194                 write(args);
00195         }
00196         
00197         /**
00198          * Send a scalar message over the underlying file descriptor.
00199          *
00200          * @param str The scalar message's content.
00201          * @throws SystemException An error occured while writing the data to the file descriptor.
00202          * @throws boost::thread_interrupted
00203          * @see readScalar(), writeScalar(const char *, unsigned int)
00204          */
00205         void writeScalar(const string &str) {
00206                 writeScalar(str.c_str(), str.size());
00207         }
00208         
00209         /**
00210          * Send a scalar message over the underlying file descriptor.
00211          *
00212          * @param data The scalar message's content.
00213          * @param size The number of bytes in <tt>data</tt>.
00214          * @pre <tt>data != NULL</tt>
00215          * @throws SystemException An error occured while writing the data to the file descriptor.
00216          * @throws boost::thread_interrupted
00217          * @see readScalar(), writeScalar(const string &)
00218          */
00219         void writeScalar(const char *data, unsigned int size) {
00220                 uint32_t l = htonl(size);
00221                 writeRaw((const char *) &l, sizeof(uint32_t));
00222                 writeRaw(data, size);
00223         }
00224         
00225         /**
00226          * Send a block of data over the underlying file descriptor.
00227          * This method blocks until everything is sent.
00228          *
00229          * @param data The data to send.
00230          * @param size The number of bytes in <tt>data</tt>.
00231          * @pre <tt>data != NULL</tt>
00232          * @throws SystemException An error occured while writing the data to the file descriptor.
00233          * @throws boost::thread_interrupted
00234          * @see readRaw()
00235          */
00236         void writeRaw(const char *data, unsigned int size) {
00237                 ssize_t ret;
00238                 unsigned int written = 0;
00239                 do {
00240                         ret = InterruptableCalls::write(fd, data + written, size - written);
00241                         if (ret == -1) {
00242                                 throw SystemException("write() failed", errno);
00243                         } else {
00244                                 written += ret;
00245                         }
00246                 } while (written < size);
00247         }
00248         
00249         /**
00250          * Send a block of data over the underlying file descriptor.
00251          * This method blocks until everything is sent.
00252          *
00253          * @param data The data to send.
00254          * @pre <tt>data != NULL</tt>
00255          * @throws SystemException An error occured while writing the data to the file descriptor.
00256          * @throws boost::thread_interrupted
00257          */
00258         void writeRaw(const string &data) {
00259                 writeRaw(data.c_str(), data.size());
00260         }
00261         
00262         /**
00263          * Pass a file descriptor. This only works if the underlying file
00264          * descriptor is a Unix socket.
00265          *
00266          * @param fileDescriptor The file descriptor to pass.
00267          * @throws SystemException Something went wrong during file descriptor passing.
00268          * @throws boost::thread_interrupted
00269          * @pre <tt>fileDescriptor >= 0</tt>
00270          * @see readFileDescriptor()
00271          */
00272         void writeFileDescriptor(int fileDescriptor) {
00273                 struct msghdr msg;
00274                 struct iovec vec;
00275                 char dummy[1];
00276                 #ifdef __APPLE__
00277                         struct {
00278                                 struct cmsghdr header;
00279                                 int fd;
00280                         } control_data;
00281                 #else
00282                         char control_data[CMSG_SPACE(sizeof(int))];
00283                 #endif
00284                 struct cmsghdr *control_header;
00285                 int ret;
00286         
00287                 msg.msg_name = NULL;
00288                 msg.msg_namelen = 0;
00289         
00290                 /* Linux and Solaris require msg_iov to be non-NULL. */
00291                 dummy[0]       = '\0';
00292                 vec.iov_base   = dummy;
00293                 vec.iov_len    = sizeof(dummy);
00294                 msg.msg_iov    = &vec;
00295                 msg.msg_iovlen = 1;
00296         
00297                 msg.msg_control    = (caddr_t) &control_data;
00298                 msg.msg_controllen = sizeof(control_data);
00299                 msg.msg_flags      = 0;
00300                 
00301                 control_header = CMSG_FIRSTHDR(&msg);
00302                 control_header->cmsg_level = SOL_SOCKET;
00303                 control_header->cmsg_type  = SCM_RIGHTS;
00304                 #ifdef __APPLE__
00305                         control_header->cmsg_len = sizeof(control_data);
00306                         control_data.fd = fileDescriptor;
00307                 #else
00308                         control_header->cmsg_len = CMSG_LEN(sizeof(int));
00309                         memcpy(CMSG_DATA(control_header), &fileDescriptor, sizeof(int));
00310                 #endif
00311                 
00312                 ret = InterruptableCalls::sendmsg(fd, &msg, 0);
00313                 if (ret == -1) {
00314                         throw SystemException("Cannot send file descriptor with sendmsg()", errno);
00315                 }
00316         }
00317         
00318         /**
00319          * Read an array message from the underlying file descriptor.
00320          *
00321          * @param args The message will be put in this variable.
00322          * @return Whether end-of-file has been reached. If so, then the contents
00323          *         of <tt>args</tt> will be undefined.
00324          * @throws SystemException If an error occured while receiving the message.
00325          * @throws boost::thread_interrupted
00326          * @see write()
00327          */
00328         bool read(vector<string> &args) {
00329                 uint16_t size;
00330                 int ret;
00331                 unsigned int alreadyRead = 0;
00332                 
00333                 do {
00334                         ret = InterruptableCalls::read(fd, (char *) &size + alreadyRead, sizeof(size) - alreadyRead);
00335                         if (ret == -1) {
00336                                 throw SystemException("read() failed", errno);
00337                         } else if (ret == 0) {
00338                                 return false;
00339                         }
00340                         alreadyRead += ret;
00341                 } while (alreadyRead < sizeof(size));
00342                 size = ntohs(size);
00343                 
00344                 string buffer;
00345                 args.clear();
00346                 buffer.reserve(size);
00347                 while (buffer.size() < size) {
00348                         char tmp[1024 * 8];
00349                         ret = InterruptableCalls::read(fd, tmp, min(size - buffer.size(), sizeof(tmp)));
00350                         if (ret == -1) {
00351                                 throw SystemException("read() failed", errno);
00352                         } else if (ret == 0) {
00353                                 return false;
00354                         }
00355                         buffer.append(tmp, ret);
00356                 }
00357                 
00358                 if (!buffer.empty()) {
00359                         string::size_type start = 0, pos;
00360                         const string &const_buffer(buffer);
00361                         while ((pos = const_buffer.find('\0', start)) != string::npos) {
00362                                 args.push_back(const_buffer.substr(start, pos - start));
00363                                 start = pos + 1;
00364                         }
00365                 }
00366                 return true;
00367         }
00368         
00369         /**
00370          * Read a scalar message from the underlying file descriptor.
00371          *
00372          * @param output The message will be put in here.
00373          * @returns Whether end-of-file was reached during reading.
00374          * @throws SystemException An error occured while writing the data to the file descriptor.
00375          * @throws boost::thread_interrupted
00376          * @see writeScalar()
00377          */
00378         bool readScalar(string &output) {
00379                 uint32_t size;
00380                 unsigned int remaining;
00381                 
00382                 if (!readRaw(&size, sizeof(uint32_t))) {
00383                         return false;
00384                 }
00385                 size = ntohl(size);
00386                 
00387                 output.clear();
00388                 output.reserve(size);
00389                 remaining = size;
00390                 while (remaining > 0) {
00391                         char buf[1024 * 32];
00392                         unsigned int blockSize = min((unsigned int) sizeof(buf), remaining);
00393                         
00394                         if (!readRaw(buf, blockSize)) {
00395                                 return false;
00396                         }
00397                         output.append(buf, blockSize);
00398                         remaining -= blockSize;
00399                 }
00400                 return true;
00401         }
00402         
00403         /**
00404          * Read exactly <tt>size</tt> bytes of data from the underlying file descriptor,
00405          * and put the result in <tt>buf</tt>. If end-of-file has been reached, or if
00406          * end-of-file was encountered before <tt>size</tt> bytes have been read, then
00407          * <tt>false</tt> will be returned. Otherwise (i.e. if the read was successful),
00408          * <tt>true</tt> will be returned.
00409          *
00410          * @param buf The buffer to place the read data in. This buffer must be at least
00411          *            <tt>size</tt> bytes long.
00412          * @param size The number of bytes to read.
00413          * @return Whether reading was successful or whether EOF was reached.
00414          * @pre buf != NULL
00415          * @throws SystemException Something went wrong during reading.
00416          * @throws boost::thread_interrupted
00417          * @see writeRaw()
00418          */
00419         bool readRaw(void *buf, unsigned int size) {
00420                 ssize_t ret;
00421                 unsigned int alreadyRead = 0;
00422                 
00423                 while (alreadyRead < size) {
00424                         ret = InterruptableCalls::read(fd, (char *) buf + alreadyRead, size - alreadyRead);
00425                         if (ret == -1) {
00426                                 throw SystemException("read() failed", errno);
00427                         } else if (ret == 0) {
00428                                 return false;
00429                         } else {
00430                                 alreadyRead += ret;
00431                         }
00432                 }
00433                 return true;
00434         }
00435         
00436         /**
00437          * Receive a file descriptor, which had been passed over the underlying
00438          * file descriptor.
00439          *
00440          * @return The passed file descriptor.
00441          * @throws SystemException If something went wrong during the
00442          *            receiving of a file descriptor. Perhaps the underlying
00443          *            file descriptor isn't a Unix socket.
00444          * @throws IOException Whatever was received doesn't seem to be a
00445          *            file descriptor.
00446          * @throws boost::thread_interrupted
00447          */
00448         int readFileDescriptor() {
00449                 struct msghdr msg;
00450                 struct iovec vec;
00451                 char dummy[1];
00452                 #ifdef __APPLE__
00453                         // File descriptor passing macros (CMSG_*) seem to be broken
00454                         // on 64-bit MacOS X. This structure works around the problem.
00455                         struct {
00456                                 struct cmsghdr header;
00457                                 int fd;
00458                         } control_data;
00459                         #define EXPECTED_CMSG_LEN sizeof(control_data)
00460                 #else
00461                         char control_data[CMSG_SPACE(sizeof(int))];
00462                         #define EXPECTED_CMSG_LEN CMSG_LEN(sizeof(int))
00463                 #endif
00464                 struct cmsghdr *control_header;
00465                 int ret;
00466 
00467                 msg.msg_name    = NULL;
00468                 msg.msg_namelen = 0;
00469                 
00470                 dummy[0]       = '\0';
00471                 vec.iov_base   = dummy;
00472                 vec.iov_len    = sizeof(dummy);
00473                 msg.msg_iov    = &vec;
00474                 msg.msg_iovlen = 1;
00475 
00476                 msg.msg_control    = (caddr_t) &control_data;
00477                 msg.msg_controllen = sizeof(control_data);
00478                 msg.msg_flags      = 0;
00479                 
00480                 ret = InterruptableCalls::recvmsg(fd, &msg, 0);
00481                 if (ret == -1) {
00482                         throw SystemException("Cannot read file descriptor with recvmsg()", errno);
00483                 }
00484                 
00485                 control_header = CMSG_FIRSTHDR(&msg);
00486                 if (control_header->cmsg_len   != EXPECTED_CMSG_LEN
00487                  || control_header->cmsg_level != SOL_SOCKET
00488                  || control_header->cmsg_type  != SCM_RIGHTS) {
00489                         throw IOException("No valid file descriptor received.");
00490                 }
00491                 #ifdef __APPLE__
00492                         return control_data.fd;
00493                 #else
00494                         return *((int *) CMSG_DATA(control_header));
00495                 #endif
00496         }
00497 };
00498 
00499 } // namespace Passenger
00500 
00501 #endif /* _PASSENGER_MESSAGE_CHANNEL_H_ */

Generated on Fri Jan 23 08:28:57 2009 for Passenger by  doxygen 1.4.7