00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00024 #include "config.h"
00025
00026 #include <cstring>
00027
00028 #include <scalestack/database_proxy/service.h>
00029 #include <scalestack/database_proxy/client.h>
00030 #include <scalestack/database_proxy/connection.h>
00031 #include <scalestack/kernel/module.h>
00032
00033 namespace scalestack
00034 {
00035 namespace database_proxy
00036 {
00037
00038
00039
00040
00041
00042 connection::connection(service& service):
00043 network::stream(service),
00044 _state(),
00045 _need_handshake(true),
00046 _need_yield(false),
00047 _in_transaction(false),
00048 _shutdown(false),
00049 _size(),
00050 _packet_size(),
00051 _buffer(),
00052 _client()
00053 {
00054 _client = service.get_client(this, _need_handshake);
00055 }
00056
00057 size_t connection::read(uint8_t* buffer, size_t size)
00058 {
00059 _size = size;
00060 _buffer = buffer;
00061 return _client_write();
00062 }
00063
00064 void connection::continue_write(void)
00065 {
00066 consume(_client_write());
00067 }
00068
00069 void connection::flush_write(void)
00070 {
00071 if (_client != NULL)
00072 _client->continue_write();
00073 }
00074
00075 void connection::error(void)
00076 {
00077 _shutdown = true;
00078 run_now();
00079 }
00080
00081 void connection::run(void)
00082 {
00083 if (_shutdown)
00084 {
00085 if (_client == NULL)
00086 {
00087 static_cast<service&>(_stream_service).remove_from_queue(this);
00088 stop();
00089 return;
00090 }
00091
00092 static_cast<service&>(_stream_service).yield_client(_client);
00093 _client = NULL;
00094 return;
00095 }
00096
00097 if (_client == NULL)
00098 _client = static_cast<service&>(_stream_service).get_client(this, _need_handshake);
00099 else if (_need_yield)
00100 {
00101 _need_yield = false;
00102 static_cast<service&>(_stream_service).yield_client(_client);
00103 _client = NULL;
00104 }
00105 }
00106
00107 void connection::yield(void)
00108 {
00109 _need_yield = true;
00110 run_now();
00111 }
00112
00113
00114
00115
00116
00117 size_t connection::_client_write(void)
00118 {
00119 if (_shutdown)
00120 return 0;
00121
00122 if (_need_yield)
00123 {
00124 _need_yield = false;
00125 static_cast<service&>(_stream_service).yield_client(_client);
00126 _client = NULL;
00127 }
00128
00129 if (_client == NULL)
00130 {
00131 _client = static_cast<service&>(_stream_service).get_client(this, _need_handshake);
00132
00133 return 0;
00134 }
00135
00136 size_t written = 0;
00137
00138 switch (_state)
00139 {
00140 case CLIENT_HANDSHAKE_PACKET:
00141 if (_size < 4)
00142 return 0;
00143
00144 _packet_size = 4 + (_buffer[0] | (static_cast<size_t>(_buffer[1]) << 8) | (static_cast<size_t>(_buffer[2]) << 16));
00145 _state = CLIENT_HANDSHAKE_FLUSH;
00146 _module.log_debug("C-HAND packet: %u", _packet_size);
00147
00148 case CLIENT_HANDSHAKE_FLUSH:
00149 written = _client->write(_buffer, _packet_size, true);
00150 _size -= written;
00151 _buffer += written;
00152
00153 _packet_size -= written;
00154 if (_packet_size == 0)
00155 {
00156 _state = COMMAND_PACKET;
00157 _need_handshake = false;
00158 }
00159
00160 break;
00161
00162 case COMMAND_PACKET:
00163 if (_size < 4)
00164 return 0;
00165
00166 _packet_size = 4 + (_buffer[0] | (static_cast<size_t>(_buffer[1]) << 8) | (static_cast<size_t>(_buffer[2]) << 16));
00167 _state = COMMAND_FLUSH;
00168 _module.log_debug("COMMAND packet: %u", _packet_size);
00169 if (_packet_size == 10 && _size >= 10 && !memcmp("BEGIN", _buffer + 5, 5))
00170 _in_transaction = true;
00171 else if (!_in_transaction ||
00172 (_packet_size == 11 && _size >= 11 &&
00173 !memcmp("COMMIT", _buffer + 5, 6)))
00174 {
00175 _in_transaction = false;
00176 }
00177
00178 if (_packet_size == 5 && _buffer[4] == 1)
00179 {
00180 error();
00181 return 5;
00182 }
00183
00184 case COMMAND_FLUSH:
00185 written = _client->write(_buffer, _packet_size, true);
00186 _size -= written;
00187 _buffer += written;
00188
00189 _packet_size -= written;
00190 if (_packet_size == 0)
00191 _state = COMMAND_PACKET;
00192
00193 break;
00194 }
00195
00196 if (!_in_transaction && _packet_size == 0)
00197 _client->yield();
00198
00199 return written;
00200 }
00201
00202 }
00203 }