00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00024 #include "config.h"
00025
00026 #include <scalestack/database_proxy/client.h>
00027 #include <scalestack/database_proxy/client_service.h>
00028 #include <scalestack/database_proxy/connection.h>
00029 #include <scalestack/kernel/module.h>
00030
00031 namespace scalestack
00032 {
00033 namespace database_proxy
00034 {
00035
00036
00037
00038
00039
00040 client::client(client_service& client_service):
00041 network::stream(client_service),
00042 _state(),
00043 _next_state(),
00044 _need_yield(false),
00045 _shutdown(false),
00046 _size(),
00047 _packet_size(),
00048 _buffer(),
00049 _connection()
00050 {
00051 }
00052
00053 void client::set_connection(database_proxy::connection* bound_connection,
00054 bool need_handshake)
00055 {
00056 _connection = bound_connection;
00057
00058 if (_connection == NULL)
00059 return;
00060
00061 if (need_handshake && _state != SERVER_HANDSHAKE_PACKET &&
00062 _state != SERVER_HANDSHAKE_FLUSH)
00063 {
00064 _state = SERVER_HANDSHAKE_PACKET;
00065 reconnect();
00066 }
00067 else
00068 run_now();
00069 }
00070
00071 void client::yield(void)
00072 {
00073 _need_yield = true;
00074 }
00075
00076 void client::connected(void)
00077 {
00078 if (_connection == NULL)
00079 return;
00080
00081 _connection->continue_write();
00082 }
00083
00084 size_t client::read(uint8_t* buffer, size_t size)
00085 {
00086 _size = size;
00087 _buffer = buffer;
00088 return _connection_write();
00089 }
00090
00091 void client::continue_write(void)
00092 {
00093 consume(_connection_write());
00094 }
00095
00096 void client::flush_write(void)
00097 {
00098 _connection->continue_write();
00099 }
00100
00101 void client::error(void)
00102 {
00103 _connection->error();
00104 }
00105
00106 void client::run(void)
00107 {
00108 if (_shutdown)
00109 {
00110 _connection->run_now();
00111 stop();
00112 }
00113 else
00114 _connection->continue_write();
00115 }
00116
00117
00118
00119
00120
00121 size_t client::_connection_write(void)
00122 {
00123 if (_connection == NULL)
00124 return 0;
00125
00126 size_t written = 0;
00127 size_t total_written = 0;
00128
00129 switch (_state)
00130 {
00131 case SERVER_HANDSHAKE_PACKET:
00132 if (_size < 4)
00133 return 0;
00134
00135 _packet_size = 4 + (_buffer[0] | (static_cast<size_t>(_buffer[1]) << 8) | (static_cast<size_t>(_buffer[2]) << 16));
00136 _state = SERVER_HANDSHAKE_FLUSH;
00137 _module.log_debug("S-HAND packet: %u", _packet_size);
00138
00139 case SERVER_HANDSHAKE_FLUSH:
00140 total_written = _connection->write(_buffer, _packet_size, true);
00141 _size -= total_written;
00142 _buffer += total_written;
00143
00144 _packet_size -= total_written;
00145 if (_packet_size == 0)
00146 _state = RESULT_PACKET;
00147
00148 break;
00149
00150 case RESULT_PACKET:
00151 if (_size < 5)
00152 return 0;
00153
00154 _packet_size = 4 + (_buffer[0] | (static_cast<size_t>(_buffer[1]) << 8) | (static_cast<size_t>(_buffer[2]) << 16));
00155 _state = RESULT_FLUSH;
00156 _module.log_debug("RESULT packet: %u", _packet_size);
00157
00158 if (_buffer[4] == 0 || _buffer[4] >= 254)
00159 _next_state = RESULT_PACKET;
00160 else
00161 _next_state = COLUMN_PACKET;
00162
00163 case RESULT_FLUSH:
00164 total_written = _connection->write(_buffer, _packet_size,
00165 _next_state == RESULT_PACKET);
00166 _size -= total_written;
00167 _buffer += total_written;
00168
00169 _packet_size -= total_written;
00170 if (_packet_size == 0)
00171 _state = _next_state;
00172
00173 if (_state != COLUMN_PACKET)
00174 break;
00175
00176 case COLUMN_PACKET:
00177 while (1)
00178 {
00179 if (_size < 5)
00180 return 0;
00181
00182 _packet_size = 4 + (_buffer[0] | (static_cast<size_t>(_buffer[1]) << 8) | (static_cast<size_t>(_buffer[2]) << 16));
00183 _state = COLUMN_FLUSH;
00184 _module.log_debug("COLUMN packet: %u", _packet_size);
00185
00186 if (_packet_size == 9 && _buffer[4] == 254)
00187 _next_state = ROW_PACKET;
00188 else
00189 _next_state = COLUMN_PACKET;
00190
00191 case COLUMN_FLUSH:
00192 written = _connection->write(_buffer, _packet_size, false);
00193 _size -= written;
00194 _buffer += written;
00195 total_written += written;
00196
00197 _packet_size -= written;
00198 if (_packet_size == 0)
00199 _state = _next_state;
00200
00201 if (_state == ROW_PACKET)
00202 break;
00203 }
00204
00205 case ROW_PACKET:
00206 while (1)
00207 {
00208 if (_size < 5)
00209 return 0;
00210
00211 _packet_size = 4 + (_buffer[0] | (static_cast<size_t>(_buffer[1]) << 8) | (static_cast<size_t>(_buffer[2]) << 16));
00212 _state = ROW_FLUSH;
00213 _module.log_debug("ROW packet: %u", _packet_size);
00214
00215 if (_packet_size == 9 && _buffer[4] == 254)
00216 _next_state = RESULT_PACKET;
00217 else
00218 _next_state = ROW_PACKET;
00219
00220 case ROW_FLUSH:
00221 written = _connection->write(_buffer, _packet_size,
00222 _next_state == RESULT_PACKET);
00223 _size -= written;
00224 _buffer += written;
00225 total_written += written;
00226
00227 _packet_size -= written;
00228 if (_packet_size == 0)
00229 _state = _next_state;
00230
00231 if (_state == RESULT_PACKET)
00232 break;
00233 }
00234 }
00235
00236 if (_need_yield && _state == RESULT_PACKET && _packet_size == 0)
00237 {
00238 _need_yield = false;
00239 _connection->yield();
00240 }
00241
00242 return total_written;
00243 }
00244
00245 }
00246 }