|
|
Многопоточный сервер на пуле потоков
В предыдущей статье мы рассмотрели принцип построения многопоточного сервера
для клиентов, предполагающих постоянное удержание сокета в открытом состоянии. Такой подход применим при условии, что вы знаете примерное количество
подключаемых клиентов, и сможете примерно оценить нагруженность сервера. К примеру, работа сервера в ограниченной локальной сети.
Если мы ограничены в ресурсах, а количество подключаемых клиентов нам неизвестна, разумно использовать пул потоков. Тикловская
Библиотека Thread, предоставляет возможность работы с пулом потоков. Основное отличие такой организации сервера в том, что пул будет существовать
все время пока работает программа и он в способности обслужить запросы от разных клиентов. При этом, после обработки запроса от клиента, необходимо
чтобы клиент закрыл сетевое соединение, в тоже время, необходимо предусмотреть возможность автоматического разрыва соединения сокета сервером,
для случая удержания сокета клиентом. Иначе, пул постоянно будет занят одним и тем же клиентом и не будет способен обрабатывать запросы других клиентов,
а такой режим работы ничем не отличается от многопоточного сервера без пула потоков.
# загружаем библиотеку потоков
package require Thread
# запускаем слушатель на 7000 tcp порту
# при получении запроса от клиента, будет запускаться процедура _Accept
socket -server _Accept 7000
# входим в обработчик tcl событий
# это важная строка, без неё tcl-приложение завершится.
# Для Tk-приложения данная строка не нужна.
vwait forever
# определяем количество пулов
# У меня 5 пулов успешно обрабаиывают запросы от 400-та клиентов
set Main(TcpPool,maxjobs) 5
# Создаем пул потоков на пять пулов
set Main(tcptpid) [tpool::create -minworkers $Main(TcpPool,maxjobs) -maxworkers $Main(TcpPool,maxjobs)) -initcmd {
# инициализируем переменную, хранящую количество активных пулов
tsv::set TcpTpool CountJobs 0
# загружаем требуемые внешние библиотеки
foreach { p } { Pgtcl tls md5 crc32 } {
package require $p
}
# загружаем исходный код приложения с других файлов
foreach { s } { lib mainthreadtcp protocol protocolt } {
uplevel #0 source $s.tcl
}
}]
# подготавливаемся к передаче обработки сокета с главного интерпретатора в свободный (не занятый) пул
proc _Accept {idsocket ipaddr port} {
# выполняем задание только в свободные циклы работы интерпретатора
after idle [list Accept $idsocket $ipaddr $port]
}
# Собственно, передаем сокет в свободный пул
proc Accept { idsocket addr port } {
global Main
# проверяем, есть ли свободные пулы
if { [tsv::get TcpTpool CountJobs] > $Main(TcpPool,maxjobs) } {
# свободных пулов нет. Закрываем сокет. Отказываем в обработке запроса от клиента
close $idsocket
return
}
# передаем сокет с главного интерпретатора в интерпретатор порождённого потока
tsv::incr TcpTpool CountJobs
thread::detach $idsocket
tpool::post -detached -nowait $Main(tcptpid) [list Accept:Thread $addr $idsocket]
}
# процедура создания событийного чтения данных из сокета
proc Accept:Thread { addr idsocket } {
global ProcedureExec
# Эти две строки инициализируют переменные для, отложенного во времени, завершения пула
set handler [clock click -microseconds]
set ProcedureExec($handler) -2
# проверяем, можно ли закрыть сокет и освободить пул, спустя 60 секунд от запроса клиента
after 60000 Timer:CheckConnectedSocket $handler $idsocket $addr
# подсоединяем, ранее отсоединённый от главного интерпретатора, сокет в текущий пул
thread::attach $idsocket
# конфигурируем сокет
# важно, для событийной обработки чтения данных с сокета, сокет должен быть в неблокирующем режиме
chan configure $idsocket -blocking 0 -buffering line -encoding binary -translation binary
# создаём событийный обработчик чтения данных с потока
# при возникновении данных на сокете, запускается процедура SocketEventHandler:Thread
chan event $idsocket readable [list SocketEventHandler:Thread $handler $idsocket $addr]
# завершаем пул ТОЛЬКО по событию изменения переменной
vwait Main($idsocket,IsRun)
}
# проверяем, можно ли закрыть сокет и освободить пул
proc Timer:CheckConnectedSocket { handler idsocket addr } {
global ProcedureExec
if { $ProcedureExec($handler) == -1 } return
if { $ProcedureExec($handler) in "-2 1" } {
# проверяем, не выполняется ли в текущий момент передача данных от сервера клиенту
set p [chan pending output $idsocket]
if { $p == 0 } {
# передача данных клиенту завершена. Освобождаем пул
Exit:Thread $handler $idsocket
return
}
}
# передача данных клиенту продолжается. Проверяем завершение передачи через одну секунду
after 1000 Timer:CheckConnectedSocket $handler $idsocket $addr
}
proc Exit:Thread { handler idsocket } {
global ProcedureExec
set ProcedureExec($handler) -1
# декраментируем количество занятых пулов
tsv::incr TcpTpool CountJobs -1
# останавливаем событийный обработчик чтения данных с сокета
catch { chan event $idsocket readable {} }
# закрываем сокет
catch { chan close $idsocket }
# изменяем значение переменной для остановки пула
set Main($idsocket,IsRun) 0
}
# читаем данные с сокета
proc SocketEventHandler:Thread { handler idsocket addr } {
global ProcedureExec
if { [chan eof $idsocket] || [catch {chan gets $idsocket pkt} err] != 0 } {
Exit:Thread $handler $idsocket
return
} else {
# здесь помещаем код по обработке полученного пакета
# переменная pkt содержит полученные данные от клиента
set ProcedureExec($handler) 1
}
}
|