00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00024 #include "config.h"
00025
00026 #include <algorithm>
00027
00028 #include <scalestack/database_proxy/client.h>
00029 #include <scalestack/database_proxy/client_service.h>
00030 #include <scalestack/database_proxy/connection.h>
00031 #include <scalestack/kernel/module.h>
00032 #include <scalestack/kernel/option.h>
00033
00034 using namespace std;
00035
00036 namespace scalestack
00037 {
00038 namespace database_proxy
00039 {
00040
00041
00042
00043
00044
00045 client_service::client_service(kernel::module& module):
00046 network::stream_service(module),
00047 _max_clients(_module.get_option("max_clients").get_size_value()),
00048 _current_clients(),
00049 _queue_mutex(),
00050 _waiting_connections(),
00051 _free_clients(),
00052 _connection_service(module, *this)
00053 {
00054 int return_code = pthread_mutex_init(&_queue_mutex, NULL);
00055 if (return_code != 0)
00056 _module.log_fatal("Failed pthread_mutex_init: %d", return_code);
00057 }
00058
00059 client_service::~client_service()
00060 {
00061 int return_code = pthread_mutex_destroy(&_queue_mutex);
00062 if (return_code != 0)
00063 _module.log_error("Failed pthread_mutex_destroy: %d", return_code);
00064 }
00065
00066 network::stream* client_service::add_stream(void)
00067 {
00068 return new client(*this);
00069 }
00070
00071 void client_service::remove_stream(network::stream* old_stream)
00072 {
00073 pthread_mutex_lock(&_queue_mutex);
00074 --_current_clients;
00075 pthread_mutex_unlock(&_queue_mutex);
00076 delete old_stream;
00077 }
00078
00079 client* client_service::get_client(connection* bound_connection, bool need_handshake)
00080 {
00081 pthread_mutex_lock(&_queue_mutex);
00082
00083 client* new_client;
00084 vector<connection*>::reverse_iterator found_connection;
00085 found_connection = find(_waiting_connections.rbegin(),
00086 _waiting_connections.rend(),
00087 bound_connection);
00088
00089 if (_free_clients.empty() ||
00090 (_current_clients != _max_clients && need_handshake))
00091 {
00092 if (_current_clients == _max_clients)
00093 {
00094 if (found_connection == _waiting_connections.rend())
00095 _waiting_connections.insert(_waiting_connections.begin(), bound_connection);
00096 pthread_mutex_unlock(&_queue_mutex);
00097 return NULL;
00098 }
00099
00100 new_client = static_cast<client*>(_connection_service.add_connection(_module.get_option("client_address").get_value()));
00101 ++_current_clients;
00102 }
00103 else
00104 {
00105 new_client = _free_clients.back();
00106 _free_clients.pop_back();
00107 }
00108
00109 if (found_connection != _waiting_connections.rend())
00110 _waiting_connections.erase(found_connection.base() - 1);
00111
00112 pthread_mutex_unlock(&_queue_mutex);
00113
00114 new_client->set_connection(bound_connection, need_handshake);
00115 return new_client;
00116 }
00117
00118 void client_service::yield_client(client* bound_client)
00119 {
00120 pthread_mutex_lock(&_queue_mutex);
00121
00122 _free_clients.push_back(bound_client);
00123 if (!_waiting_connections.empty())
00124 _waiting_connections.back()->run_now();
00125
00126 pthread_mutex_unlock(&_queue_mutex);
00127 }
00128
00129 void client_service::remove_from_queue(connection* bound_connection)
00130 {
00131 pthread_mutex_lock(&_queue_mutex);
00132
00133 vector<connection*>::iterator found_connection;
00134 found_connection = find(_waiting_connections.begin(),
00135 _waiting_connections.end(),
00136 bound_connection);
00137 if (found_connection != _waiting_connections.end())
00138 _waiting_connections.erase(found_connection);
00139
00140 pthread_mutex_unlock(&_queue_mutex);
00141 }
00142
00143 }
00144 }