00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef _PASSENGER_STANDARD_APPLICATION_POOL_H_
00021 #define _PASSENGER_STANDARD_APPLICATION_POOL_H_
00022
00023 #include <boost/shared_ptr.hpp>
00024 #include <boost/weak_ptr.hpp>
00025 #include <boost/thread.hpp>
00026 #include <boost/bind.hpp>
00027 #include <boost/date_time/microsec_time_clock.hpp>
00028 #include <boost/date_time/posix_time/posix_time.hpp>
00029
00030 #include <string>
00031 #include <sstream>
00032 #include <map>
00033 #include <list>
00034
00035 #include <sys/types.h>
00036 #include <sys/stat.h>
00037 #include <stdio.h>
00038 #include <unistd.h>
00039 #include <ctime>
00040 #include <cerrno>
00041 #ifdef TESTING_APPLICATION_POOL
00042 #include <cstdlib>
00043 #endif
00044
00045 #include "ApplicationPool.h"
00046 #include "Logging.h"
00047 #include "System.h"
00048 #ifdef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00049 #include "DummySpawnManager.h"
00050 #else
00051 #include "SpawnManager.h"
00052 #endif
00053
00054 namespace Passenger {
00055
00056 using namespace std;
00057 using namespace boost;
00058
00059 class ApplicationPoolServer;
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092 class StandardApplicationPool: public ApplicationPool {
00093 private:
00094 static const int DEFAULT_MAX_IDLE_TIME = 120;
00095 static const int DEFAULT_MAX_POOL_SIZE = 20;
00096 static const int DEFAULT_MAX_INSTANCES_PER_APP = 0;
00097 static const int CLEANER_THREAD_STACK_SIZE = 1024 * 128;
00098 static const unsigned int MAX_GET_ATTEMPTS = 10;
00099 static const unsigned int GET_TIMEOUT = 5000;
00100
00101 friend class ApplicationPoolServer;
00102 struct AppContainer;
00103
00104 typedef shared_ptr<AppContainer> AppContainerPtr;
00105 typedef list<AppContainerPtr> AppContainerList;
00106 typedef shared_ptr<AppContainerList> AppContainerListPtr;
00107 typedef map<string, AppContainerListPtr> ApplicationMap;
00108
00109 struct AppContainer {
00110 ApplicationPtr app;
00111 time_t lastUsed;
00112 unsigned int sessions;
00113 AppContainerList::iterator iterator;
00114 AppContainerList::iterator ia_iterator;
00115 };
00116
00117 struct SharedData {
00118 mutex lock;
00119 condition activeOrMaxChanged;
00120
00121 ApplicationMap apps;
00122 unsigned int max;
00123 unsigned int count;
00124 unsigned int active;
00125 unsigned int maxPerApp;
00126 AppContainerList inactiveApps;
00127 map<string, time_t> restartFileTimes;
00128 map<string, unsigned int> appInstanceCount;
00129 };
00130
00131 typedef shared_ptr<SharedData> SharedDataPtr;
00132
00133 struct SessionCloseCallback {
00134 SharedDataPtr data;
00135 weak_ptr<AppContainer> container;
00136
00137 SessionCloseCallback(SharedDataPtr data,
00138 const weak_ptr<AppContainer> &container) {
00139 this->data = data;
00140 this->container = container;
00141 }
00142
00143 void operator()() {
00144 mutex::scoped_lock l(data->lock);
00145 AppContainerPtr container(this->container.lock());
00146
00147 if (container == NULL) {
00148 return;
00149 }
00150
00151 ApplicationMap::iterator it;
00152 it = data->apps.find(container->app->getAppRoot());
00153 if (it != data->apps.end()) {
00154 AppContainerListPtr list(it->second);
00155 container->lastUsed = time(NULL);
00156 container->sessions--;
00157 if (container->sessions == 0) {
00158 list->erase(container->iterator);
00159 list->push_front(container);
00160 container->iterator = list->begin();
00161 data->inactiveApps.push_back(container);
00162 container->ia_iterator = data->inactiveApps.end();
00163 container->ia_iterator--;
00164 data->active--;
00165 data->activeOrMaxChanged.notify_all();
00166 }
00167 }
00168 }
00169 };
00170
00171 #ifdef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00172 DummySpawnManager spawnManager;
00173 #else
00174 SpawnManager spawnManager;
00175 #endif
00176 SharedDataPtr data;
00177 thread *cleanerThread;
00178 bool detached;
00179 bool done;
00180 bool useGlobalQueue;
00181 unsigned int maxIdleTime;
00182 unsigned int waitingOnGlobalQueue;
00183 condition cleanerThreadSleeper;
00184
00185
00186 mutex &lock;
00187 condition &activeOrMaxChanged;
00188 ApplicationMap &apps;
00189 unsigned int &max;
00190 unsigned int &count;
00191 unsigned int &active;
00192 unsigned int &maxPerApp;
00193 AppContainerList &inactiveApps;
00194 map<string, time_t> &restartFileTimes;
00195 map<string, unsigned int> &appInstanceCount;
00196
00197
00198
00199
00200 bool inline verifyState() {
00201 #if PASSENGER_DEBUG
00202
00203 ApplicationMap::const_iterator it;
00204 for (it = apps.begin(); it != apps.end(); it++) {
00205 AppContainerList *list = it->second.get();
00206 P_ASSERT(!list->empty(), false, "List for '" << it->first << "' is nonempty.");
00207
00208 AppContainerList::const_iterator prev_lit;
00209 AppContainerList::const_iterator lit;
00210 prev_lit = list->begin();
00211 lit = prev_lit;
00212 lit++;
00213 for (; lit != list->end(); lit++) {
00214 if ((*prev_lit)->sessions > 0) {
00215 P_ASSERT((*lit)->sessions > 0, false,
00216 "List for '" << it->first <<
00217 "' is sorted from nonactive to active");
00218 }
00219 }
00220 }
00221
00222 P_ASSERT(active <= count, false,
00223 "active (" << active << ") < count (" << count << ")");
00224 P_ASSERT(inactiveApps.size() == count - active, false,
00225 "inactive_apps.size() == count - active");
00226 #endif
00227 return true;
00228 }
00229
00230 template<typename LockActionType>
00231 string toString(LockActionType lockAction) const {
00232 unique_lock<mutex> l(lock, lockAction);
00233 stringstream result;
00234
00235 result << "----------- General information -----------" << endl;
00236 result << "max = " << max << endl;
00237 result << "count = " << count << endl;
00238 result << "active = " << active << endl;
00239 result << "inactive = " << inactiveApps.size() << endl;
00240 result << "Using global queue: " << (useGlobalQueue ? "yes" : "no") << endl;
00241 result << "Waiting on global queue: " << waitingOnGlobalQueue << endl;
00242 result << endl;
00243
00244 result << "----------- Applications -----------" << endl;
00245 ApplicationMap::const_iterator it;
00246 for (it = apps.begin(); it != apps.end(); it++) {
00247 AppContainerList *list = it->second.get();
00248 AppContainerList::const_iterator lit;
00249
00250 result << it->first << ": " << endl;
00251 for (lit = list->begin(); lit != list->end(); lit++) {
00252 AppContainer *container = lit->get();
00253 char buf[128];
00254
00255 snprintf(buf, sizeof(buf), "PID: %-8d Sessions: %d",
00256 container->app->getPid(), container->sessions);
00257 result << " " << buf << endl;
00258 }
00259 result << endl;
00260 }
00261 return result.str();
00262 }
00263
00264 bool needsRestart(const string &appRoot) {
00265 string restartFile(appRoot);
00266 restartFile.append("/tmp/restart.txt");
00267
00268 struct stat buf;
00269 bool result;
00270 int ret;
00271
00272 do {
00273 ret = stat(restartFile.c_str(), &buf);
00274 } while (ret == -1 && errno == EINTR);
00275 if (ret == 0) {
00276 do {
00277 ret = unlink(restartFile.c_str());
00278 } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
00279 if (ret == 0 || errno == ENOENT) {
00280 restartFileTimes.erase(appRoot);
00281 result = true;
00282 } else {
00283 map<string, time_t>::const_iterator it;
00284
00285 it = restartFileTimes.find(appRoot);
00286 if (it == restartFileTimes.end()) {
00287 result = true;
00288 } else {
00289 result = buf.st_mtime != restartFileTimes[appRoot];
00290 }
00291 restartFileTimes[appRoot] = buf.st_mtime;
00292 }
00293 } else {
00294 restartFileTimes.erase(appRoot);
00295 result = false;
00296 }
00297 return result;
00298 }
00299
00300 void cleanerThreadMainLoop() {
00301 this_thread::disable_syscall_interruption dsi;
00302 unique_lock<mutex> l(lock);
00303 try {
00304 while (!done && !this_thread::interruption_requested()) {
00305 xtime xt;
00306 xtime_get(&xt, TIME_UTC);
00307 xt.sec += maxIdleTime + 1;
00308 if (cleanerThreadSleeper.timed_wait(l, xt)) {
00309
00310 if (done) {
00311
00312 break;
00313 } else {
00314
00315 continue;
00316 }
00317 }
00318
00319 time_t now = InterruptableCalls::time(NULL);
00320 AppContainerList::iterator it;
00321 for (it = inactiveApps.begin(); it != inactiveApps.end(); it++) {
00322 AppContainer &container(*it->get());
00323 ApplicationPtr app(container.app);
00324 AppContainerListPtr appList(apps[app->getAppRoot()]);
00325
00326 if (now - container.lastUsed > (time_t) maxIdleTime) {
00327 P_DEBUG("Cleaning idle app " << app->getAppRoot() <<
00328 " (PID " << app->getPid() << ")");
00329 appList->erase(container.iterator);
00330
00331 AppContainerList::iterator prev = it;
00332 prev--;
00333 inactiveApps.erase(it);
00334 it = prev;
00335
00336 appInstanceCount[app->getAppRoot()]--;
00337
00338 count--;
00339 }
00340 if (appList->empty()) {
00341 apps.erase(app->getAppRoot());
00342 appInstanceCount.erase(app->getAppRoot());
00343 data->restartFileTimes.erase(app->getAppRoot());
00344 }
00345 }
00346 }
00347 } catch (const exception &e) {
00348 P_ERROR("Uncaught exception: " << e.what());
00349 }
00350 }
00351
00352
00353
00354
00355
00356
00357 pair<AppContainerPtr, AppContainerList *>
00358 spawnOrUseExisting(
00359 mutex::scoped_lock &l,
00360 const string &appRoot,
00361 bool lowerPrivilege,
00362 const string &lowestUser,
00363 const string &environment,
00364 const string &spawnMethod,
00365 const string &appType
00366 ) {
00367 beginning_of_function:
00368
00369 this_thread::disable_interruption di;
00370 this_thread::disable_syscall_interruption dsi;
00371 AppContainerPtr container;
00372 AppContainerList *list;
00373
00374 try {
00375 ApplicationMap::iterator it(apps.find(appRoot));
00376
00377 if (it != apps.end() && needsRestart(appRoot)) {
00378 AppContainerList::iterator it2;
00379 list = it->second.get();
00380 for (it2 = list->begin(); it2 != list->end(); it2++) {
00381 container = *it2;
00382 if (container->sessions == 0) {
00383 inactiveApps.erase(container->ia_iterator);
00384 } else {
00385 active--;
00386 }
00387 it2--;
00388 list->erase(container->iterator);
00389 count--;
00390 }
00391 apps.erase(appRoot);
00392 appInstanceCount.erase(appRoot);
00393 spawnManager.reload(appRoot);
00394 it = apps.end();
00395 activeOrMaxChanged.notify_all();
00396 }
00397
00398 if (it != apps.end()) {
00399 list = it->second.get();
00400
00401 if (list->front()->sessions == 0) {
00402 container = list->front();
00403 list->pop_front();
00404 list->push_back(container);
00405 container->iterator = list->end();
00406 container->iterator--;
00407 inactiveApps.erase(container->ia_iterator);
00408 active++;
00409 activeOrMaxChanged.notify_all();
00410 } else if (count >= max || (
00411 maxPerApp != 0 && appInstanceCount[appRoot] >= maxPerApp )
00412 ) {
00413 if (useGlobalQueue) {
00414 waitingOnGlobalQueue++;
00415 activeOrMaxChanged.wait(l);
00416 waitingOnGlobalQueue--;
00417 goto beginning_of_function;
00418 } else {
00419 AppContainerList::iterator it(list->begin());
00420 AppContainerList::iterator smallest(list->begin());
00421 it++;
00422 for (; it != list->end(); it++) {
00423 if ((*it)->sessions < (*smallest)->sessions) {
00424 smallest = it;
00425 }
00426 }
00427 container = *smallest;
00428 list->erase(smallest);
00429 list->push_back(container);
00430 container->iterator = list->end();
00431 container->iterator--;
00432 }
00433 } else {
00434 container = ptr(new AppContainer());
00435 {
00436 this_thread::restore_interruption ri(di);
00437 this_thread::restore_syscall_interruption rsi(dsi);
00438 container->app = spawnManager.spawn(appRoot,
00439 lowerPrivilege, lowestUser, environment,
00440 spawnMethod, appType);
00441 }
00442 container->sessions = 0;
00443 list->push_back(container);
00444 container->iterator = list->end();
00445 container->iterator--;
00446 appInstanceCount[appRoot]++;
00447 count++;
00448 active++;
00449 activeOrMaxChanged.notify_all();
00450 }
00451 } else {
00452 while (!(
00453 active < max &&
00454 (maxPerApp == 0 || appInstanceCount[appRoot] < maxPerApp)
00455 )) {
00456 activeOrMaxChanged.wait(l);
00457 }
00458 if (count == max) {
00459 container = inactiveApps.front();
00460 inactiveApps.pop_front();
00461 list = apps[container->app->getAppRoot()].get();
00462 list->erase(container->iterator);
00463 if (list->empty()) {
00464 apps.erase(container->app->getAppRoot());
00465 restartFileTimes.erase(container->app->getAppRoot());
00466 appInstanceCount.erase(container->app->getAppRoot());
00467 } else {
00468 appInstanceCount[container->app->getAppRoot()]--;
00469 }
00470 count--;
00471 }
00472 container = ptr(new AppContainer());
00473 {
00474 this_thread::restore_interruption ri(di);
00475 this_thread::restore_syscall_interruption rsi(dsi);
00476 container->app = spawnManager.spawn(appRoot, lowerPrivilege, lowestUser,
00477 environment, spawnMethod, appType);
00478 }
00479 container->sessions = 0;
00480 it = apps.find(appRoot);
00481 if (it == apps.end()) {
00482 list = new AppContainerList();
00483 apps[appRoot] = ptr(list);
00484 appInstanceCount[appRoot] = 1;
00485 } else {
00486 list = it->second.get();
00487 appInstanceCount[appRoot]++;
00488 }
00489 list->push_back(container);
00490 container->iterator = list->end();
00491 container->iterator--;
00492 count++;
00493 active++;
00494 activeOrMaxChanged.notify_all();
00495 }
00496 } catch (const SpawnException &e) {
00497 string message("Cannot spawn application '");
00498 message.append(appRoot);
00499 message.append("': ");
00500 message.append(e.what());
00501 if (e.hasErrorPage()) {
00502 throw SpawnException(message, e.getErrorPage());
00503 } else {
00504 throw SpawnException(message);
00505 }
00506 } catch (const exception &e) {
00507 string message("Cannot spawn application '");
00508 message.append(appRoot);
00509 message.append("': ");
00510 message.append(e.what());
00511 throw SpawnException(message);
00512 }
00513
00514 return make_pair(container, list);
00515 }
00516
00517 public:
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538 StandardApplicationPool(const string &spawnServerCommand,
00539 const string &logFile = "",
00540 const string &rubyCommand = "ruby",
00541 const string &user = "")
00542 :
00543 #ifndef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00544 spawnManager(spawnServerCommand, logFile, rubyCommand, user),
00545 #endif
00546 data(new SharedData()),
00547 lock(data->lock),
00548 activeOrMaxChanged(data->activeOrMaxChanged),
00549 apps(data->apps),
00550 max(data->max),
00551 count(data->count),
00552 active(data->active),
00553 maxPerApp(data->maxPerApp),
00554 inactiveApps(data->inactiveApps),
00555 restartFileTimes(data->restartFileTimes),
00556 appInstanceCount(data->appInstanceCount)
00557 {
00558 detached = false;
00559 done = false;
00560 max = DEFAULT_MAX_POOL_SIZE;
00561 count = 0;
00562 active = 0;
00563 useGlobalQueue = false;
00564 waitingOnGlobalQueue = 0;
00565 maxPerApp = DEFAULT_MAX_INSTANCES_PER_APP;
00566 maxIdleTime = DEFAULT_MAX_IDLE_TIME;
00567 cleanerThread = new thread(
00568 bind(&StandardApplicationPool::cleanerThreadMainLoop, this),
00569 CLEANER_THREAD_STACK_SIZE
00570 );
00571 }
00572
00573 virtual ~StandardApplicationPool() {
00574 if (!detached) {
00575 this_thread::disable_interruption di;
00576 {
00577 mutex::scoped_lock l(lock);
00578 done = true;
00579 cleanerThreadSleeper.notify_one();
00580 }
00581 cleanerThread->join();
00582 }
00583 delete cleanerThread;
00584 }
00585
00586 virtual Application::SessionPtr get(
00587 const string &appRoot,
00588 bool lowerPrivilege = true,
00589 const string &lowestUser = "nobody",
00590 const string &environment = "production",
00591 const string &spawnMethod = "smart",
00592 const string &appType = "rails"
00593 ) {
00594 using namespace boost::posix_time;
00595 unsigned int attempt = 0;
00596 ptime timeLimit(get_system_time() + millisec(GET_TIMEOUT));
00597 unique_lock<mutex> l(lock);
00598
00599 while (true) {
00600 attempt++;
00601
00602 pair<AppContainerPtr, AppContainerList *> p(
00603 spawnOrUseExisting(l, appRoot, lowerPrivilege, lowestUser,
00604 environment, spawnMethod, appType)
00605 );
00606 AppContainerPtr &container(p.first);
00607 AppContainerList &list(*p.second);
00608
00609 container->lastUsed = time(NULL);
00610 container->sessions++;
00611
00612 P_ASSERT(verifyState(), Application::SessionPtr(),
00613 "State is valid:\n" << toString(false));
00614 try {
00615 return container->app->connect(SessionCloseCallback(data, container));
00616 } catch (const exception &e) {
00617 container->sessions--;
00618 if (attempt == MAX_GET_ATTEMPTS) {
00619 string message("Cannot connect to an existing "
00620 "application instance for '");
00621 message.append(appRoot);
00622 message.append("': ");
00623 try {
00624 const SystemException &syse =
00625 dynamic_cast<const SystemException &>(e);
00626 message.append(syse.sys());
00627 } catch (const bad_cast &) {
00628 message.append(e.what());
00629 }
00630 throw IOException(message);
00631 } else {
00632 list.erase(container->iterator);
00633 if (list.empty()) {
00634 apps.erase(appRoot);
00635 appInstanceCount.erase(appRoot);
00636 }
00637 count--;
00638 active--;
00639 activeOrMaxChanged.notify_all();
00640 P_ASSERT(verifyState(), Application::SessionPtr(),
00641 "State is valid.");
00642 }
00643 }
00644 }
00645
00646 return Application::SessionPtr();
00647 }
00648
00649 virtual void clear() {
00650 mutex::scoped_lock l(lock);
00651 apps.clear();
00652 inactiveApps.clear();
00653 restartFileTimes.clear();
00654 appInstanceCount.clear();
00655 count = 0;
00656 active = 0;
00657 }
00658
00659 virtual void setMaxIdleTime(unsigned int seconds) {
00660 mutex::scoped_lock l(lock);
00661 maxIdleTime = seconds;
00662 cleanerThreadSleeper.notify_one();
00663 }
00664
00665 virtual void setMax(unsigned int max) {
00666 mutex::scoped_lock l(lock);
00667 this->max = max;
00668 activeOrMaxChanged.notify_all();
00669 }
00670
00671 virtual unsigned int getActive() const {
00672 return active;
00673 }
00674
00675 virtual unsigned int getCount() const {
00676 return count;
00677 }
00678
00679 virtual void setMaxPerApp(unsigned int maxPerApp) {
00680 mutex::scoped_lock l(lock);
00681 this->maxPerApp = maxPerApp;
00682 activeOrMaxChanged.notify_all();
00683 }
00684
00685 virtual void setUseGlobalQueue(bool value) {
00686 this->useGlobalQueue = value;
00687 }
00688
00689 virtual pid_t getSpawnServerPid() const {
00690 return spawnManager.getServerPid();
00691 }
00692
00693
00694
00695
00696
00697 virtual string toString(bool lockMutex = true) const {
00698 if (lockMutex) {
00699 return toString(boost::adopt_lock);
00700 } else {
00701 return toString(boost::defer_lock);
00702 }
00703 }
00704 };
00705
00706 }
00707
00708 #endif
00709