Ищется библиотека либо академическая статья, по которой можно написать такое.
есть тред, который читает с сокета и складывает задания в очередь.
Есть какой то пул тредов, которые с очереди забирают, выполяют и отдают результат в основной тред.
В принципе уже сам реализовал, но попахивает велосипедом. Код работает плохо, когда данные приходят очень медленно (очередь всегда пустая) и я закрываю соединение. треды в пуле повисают на семафоре и все...
Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.
Здравствуйте, OdesitVadim, Вы писали:
OV>Ищется библиотека либо академическая статья, по которой можно написать такое. OV>есть тред, который читает с сокета и складывает задания в очередь. OV>Есть какой то пул тредов, которые с очереди забирают, выполяют и отдают результат в основной тред.
OV>В принципе уже сам реализовал, но попахивает велосипедом. Код работает плохо, когда данные приходят очень медленно (очередь всегда пустая) и я закрываю соединение. треды в пуле повисают на семафоре и все...
OV>Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.
Здравствуйте, remark, Вы писали:
R>Здравствуйте, OdesitVadim, Вы писали:
[scip] OV>>Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.
R>http://www.rsdn.ru/forum/cpp.applied/3783316.1.aspx
R>
Мой код близкий к этому. И собственно вся работа кода хорошо отлажена. Проблема возникает в момент завершения работы основного треда. Нужно ведь подчистить очередь, подчистить треды...
Здравствуйте, Аноним, Вы писали:
R>>Здравствуйте, OdesitVadim, Вы писали: А>[scip] OV>>>Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.
R>>http://www.rsdn.ru/forum/cpp.applied/3783316.1.aspx
А>Мой код близкий к этому. И собственно вся работа кода хорошо отлажена. Проблема возникает в момент завершения работы основного треда. Нужно ведь подчистить очередь, подчистить треды...
А в чём именно проблема?
Установить всем потокам флаги на остановку. Потоки перед завершением снимают все эелменты из своей очереди и удаляют. Заджоинить потоки. Всё.
Здравствуйте, remark, Вы писали:
R>А в чём именно проблема? R>Установить всем потокам флаги на остановку. Потоки перед завершением снимают все эелменты из своей очереди и удаляют. Заджоинить потоки. Всё.
R>
Ну так это я тоже понимаю. Потоки висли на этом
Условная переменная (в отличие от семафора) не несёт никакого состояния, поэтому просто выполнение broadcast() без соотв. модификации состояния не имеет смысла.
Тебе надо класть в очередь сообщение-маркер и потом делать broadcast(). Либо ждать появления сообщений в очереди ИЛИ установки какого-то, устанавливать флаг и делать broadcast().
Просто broadcast() — бессмысленная переменная, можешь считать её на no-op.
Здравствуйте, remark, Вы писали:
R>Условная переменная (в отличие от семафора) не несёт никакого состояния, поэтому просто выполнение broadcast() без соотв. модификации состояния не имеет смысла. R>Тебе надо класть в очередь сообщение-маркер и потом делать broadcast(). Либо ждать появления сообщений в очереди ИЛИ установки какого-то, устанавливать флаг и делать broadcast().
Здравствуйте, OdesitVadim, Вы писали:
OV>Но так как очередь пустая, то там и оставалось. пришлось добавить выход с этого цикла по условной переменной.
Есть следующий подход : функция dequeue ожидает наступление одного из двух событий:
1. появился новый запрос для выполнения
2. запросили прервать всю работу.
соответственно, код должен быть такой (мета-код):
Здравствуйте, remark, Вы писали:
R>Условная переменная (в отличие от семафора) не несёт никакого состояния, поэтому просто выполнение broadcast() без соотв. модификации состояния не имеет смысла.
если я правильно мануалы вычитал, то она просто рассылает сигнал всем, кто ожидает. R>Тебе надо класть в очередь сообщение-маркер и потом делать broadcast(). Либо ждать появления сообщений в очереди ИЛИ установки какого-то, устанавливать флаг и делать broadcast(). R>Просто broadcast() — бессмысленная переменная, можешь считать её на no-op.
стоп, стоп. какая переменная? это же функция как минимум? и если она рассылает сообщение, то она не такая уже и бессмысленная.
R>
Здравствуйте, OdesitVadim, Вы писали:
R>>Условная переменная (в отличие от семафора) не несёт никакого состояния, поэтому просто выполнение broadcast() без соотв. модификации состояния не имеет смысла. OV>если я правильно мануалы вычитал, то она просто рассылает сигнал всем, кто ожидает. R>>Тебе надо класть в очередь сообщение-маркер и потом делать broadcast(). Либо ждать появления сообщений в очереди ИЛИ установки какого-то, устанавливать флаг и делать broadcast(). R>>Просто broadcast() — бессмысленная переменная, можешь считать её на no-op.
OV>стоп, стоп. какая переменная? это же функция как минимум? и если она рассылает сообщение, то она не такая уже и бессмысленная.
Если предикат уже был установлен, то никакие потоки не ждут, и сигнализировать бессмысленно.
Если предикат не установлен, то сигнализировать без изменения состояния предиката тоже бессмысленно, т.к. никакой поток не выйдет из ожидания.
Сигнал на условной переменной — это всегда сигнал об изменении внешнего состояния.
Здравствуйте, uzhas, Вы писали:
U>Есть следующий подход : функция dequeue ожидает наступление одного из двух событий: U>1. появился новый запрос для выполнения U>2. запросили прервать всю работу. U>соответственно, код должен быть такой (мета-код): U>
Здравствуйте, remark, Вы писали:
R>Здравствуйте, посетитель /life/, Вы писали:
ПL>>А дедлока не будет? Ожидание происходит внутри lock-секции. При вызове enqueue попытка захвата lock'а.
R>Условная переменная отпускает лок на время ожидания.
R>
А где в подходе uzhas'а условная переменная? Мне показалось, что предлагается использовать что-то типа WaitForMultipleObjects, нет?
Здравствуйте, посетитель /life/, Вы писали:
ПL>>>А дедлока не будет? Ожидание происходит внутри lock-секции. При вызове enqueue попытка захвата lock'а.
R>>Условная переменная отпускает лок на время ожидания.
ПL>А где в подходе uzhas'а условная переменная? Мне показалось, что предлагается использовать что-то типа WaitForMultipleObjects, нет?
Но там вроде ничего и не лочится, и ничего и не надо лочить...
Здравствуйте, remark, Вы писали:
ПL>>А где в подходе uzhas'а условная переменная? Мне показалось, что предлагается использовать что-то типа WaitForMultipleObjects, нет?
R>Но там вроде ничего и не лочится,
Мне показалось, что выделенный комментарий означает, что лочится: U>
U>request* dequeue()
U>{
U> //locks
U> while (wait_events(quitEvent, newRequestEvent) == NEW_REQUEST_EVENT)
U> {
U> return pop_request();
U> }
U> return 0;
U>}
U>
R>и ничего и не надо лочить...
Почему не надо? Ведь тогда enqueue и dequeue будут не синхронизированы.
Я так понял, что событие newRequestEvent используется по аналогии с условной переменной — срабатывает при помещении объекта в пустую очередь. Или там другая схема?
Здравствуйте, посетитель /life/, Вы писали:
R>>и ничего и не надо лочить... ПL>Почему не надо? Ведь тогда enqueue и dequeue будут не синхронизированы. ПL>Я так понял, что событие newRequestEvent используется по аналогии с условной переменной — срабатывает при помещении объекта в пустую очередь. Или там другая схема?
Предлагаю приостановить обсуждение пока не будет полного кода.
R>Если предикат уже был установлен, то никакие потоки не ждут, и сигнализировать бессмысленно. R>Если предикат не установлен, то сигнализировать без изменения состояния предиката тоже бессмысленно, т.к. никакой поток не выйдет из ожидания. R>Сигнал на условной переменной — это всегда сигнал об изменении внешнего состояния.
R>
При таком объяснении, получается, что ввели функцию, которая никому не нужна так как она ничего не делает. Хотя есть маленький кусочек "без изменения состояния"
Мы говорим о разных условных переменных. Я о конкретной реализации в Linux на базе pthread. Вот кусочек описания
pthread_cond_wait() puts the current thread to sleep. It requires a mutex of the associated shared resource value it is waiting on. pthread_cond_signal() signals one thread out of the possibly many sleeping threads to wakeup. pthread_cond_broadcast() signals all threads waiting on the cond condition variable to wakeup. Here is an example on using pthread condition variables:
Откуда получается, что pthread_cond_signal будит один с потоков (произвольный), который ожидает на вызове pthread_cond_wait. А pthread_cond_broadcast — это фактически в рассылка всем тредам, которые висят на pthread_cond_wait сигнала. Меня лично это поведение полностью устраивает.
Здравствуйте, OdesitVadim, Вы писали:
OV>Доброго дня.
OV>Ищется библиотека либо академическая статья, по которой можно написать такое. OV>есть тред, который читает с сокета и складывает задания в очередь. OV>Есть какой то пул тредов, которые с очереди забирают, выполяют и отдают результат в основной тред.
OV>Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.
Можешь попробовать мой вариант — thread_pool.hpp
Работает под Win/Linux.
Использовать примерно так (код не рабочий, только идея):
void threadProc(int n){
std::cout << "processed: " << res << std::endl;
}
ThreadPool<int> threadPool(threadProc, 20);
// main work cycle
while (true)
{
...
socket_type clientSock = accept (serverSock, (sockaddr* ) &clientAddr, &clientAddrLen);
int command = util::readInt (clientSock);
util::closeSocket (clientSock);
if (command == EXIT_COMMAND)
break;
threadPool.process (command);
}
...
threadPool.stop(true);
...
util::closeSocket (serverSock);
...
Здравствуйте, OdesitVadim, Вы писали:
R>>Если предикат уже был установлен, то никакие потоки не ждут, и сигнализировать бессмысленно. R>>Если предикат не установлен, то сигнализировать без изменения состояния предиката тоже бессмысленно, т.к. никакой поток не выйдет из ожидания. R>>Сигнал на условной переменной — это всегда сигнал об изменении внешнего состояния.
OV>При таком объяснении, получается, что ввели функцию, которая никому не нужна так как она ничего не делает. Хотя есть маленький кусочек "без изменения состояния"
В этом кусочке и суть.
OV>Мы говорим о разных условных переменных. Я о конкретной реализации в Linux на базе pthread. Вот кусочек описания
OV>
OV>pthread_cond_wait() puts the current thread to sleep. It requires a mutex of the associated shared resource value it is waiting on. pthread_cond_signal() signals one thread out of the possibly many sleeping threads to wakeup. pthread_cond_broadcast() signals all threads waiting on the cond condition variable to wakeup. Here is an example on using pthread condition variables:
OV>Откуда получается, что pthread_cond_signal будит один с потоков (произвольный), который ожидает на вызове pthread_cond_wait. А pthread_cond_broadcast — это фактически в рассылка всем тредам, которые висят на pthread_cond_wait сигнала. Меня лично это поведение полностью устраивает.
Мы говорим об одной и той же условной переменной.
Обрати внимание на кусок "a mutex of the associated shared resource value it is waiting on".
Условная переменная сигнализирует об изменении состояния, если состояние не изменилось, то сигнализирование бессмысленно. Ожидание на условной переменной всегда делается в цикле, который проверяет состояние. При сигнализировании другой поток действительно проснётся, но сразу же увивдит, что состояние не изменилось, и уснёт обратно. Выход нулевой.
Интерфейс POSIX ограничен С, если же ты посмотришь любую нормальную реализацию условных переменных для С++, то они принимают предикат, которого ждём. При использовании такого АПИ, ожидающий поток даже и не выйдёт из фунукции ожидания при сигнализировании без изменения сосотояния предикаита.
Здравствуйте, remark, Вы писали:
[scip] R>Мы говорим об одной и той же условной переменной.
[scip] R>Интерфейс POSIX ограничен С,
Но работает так как я написал? R>если же ты посмотришь любую нормальную реализацию условных переменных для С++, то они принимают предикат, которого ждём. При использовании такого АПИ, ожидающий поток даже и не выйдёт из фунукции ожидания при сигнализировании без изменения сосотояния предикаита.
тоесть буста?
R>