реализация читатели-писатель
От: OdesitVadim Украина  
Дата: 26.04.10 14:25
Оценка:
Доброго дня.

Ищется библиотека либо академическая статья, по которой можно написать такое.
есть тред, который читает с сокета и складывает задания в очередь.
Есть какой то пул тредов, которые с очереди забирают, выполяют и отдают результат в основной тред.

В принципе уже сам реализовал, но попахивает велосипедом. Код работает плохо, когда данные приходят очень медленно (очередь всегда пустая) и я закрываю соединение. треды в пуле повисают на семафоре и все...

Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.
Re: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 26.04.10 14:32
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

OV>Ищется библиотека либо академическая статья, по которой можно написать такое.

OV>есть тред, который читает с сокета и складывает задания в очередь.
OV>Есть какой то пул тредов, которые с очереди забирают, выполяют и отдают результат в основной тред.

OV>В принципе уже сам реализовал, но попахивает велосипедом. Код работает плохо, когда данные приходят очень медленно (очередь всегда пустая) и я закрываю соединение. треды в пуле повисают на семафоре и все...


OV>Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.


http://www.rsdn.ru/forum/cpp.applied/3783316.1.aspx
Автор: remark
Дата: 22.04.10



1024cores — all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[2]: реализация читатели-писатель
От: Аноним  
Дата: 27.04.10 08:22
Оценка:
Здравствуйте, remark, Вы писали:

R>Здравствуйте, OdesitVadim, Вы писали:

[scip]
OV>>Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.

R>http://www.rsdn.ru/forum/cpp.applied/3783316.1.aspx
Автор: remark
Дата: 22.04.10


R>

Мой код близкий к этому. И собственно вся работа кода хорошо отлажена. Проблема возникает в момент завершения работы основного треда. Нужно ведь подчистить очередь, подчистить треды...
Re[3]: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 27.04.10 08:40
Оценка:
Здравствуйте, Аноним, Вы писали:

R>>Здравствуйте, OdesitVadim, Вы писали:

А>[scip]
OV>>>Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.

R>>http://www.rsdn.ru/forum/cpp.applied/3783316.1.aspx
Автор: remark
Дата: 22.04.10


А>Мой код близкий к этому. И собственно вся работа кода хорошо отлажена. Проблема возникает в момент завершения работы основного треда. Нужно ведь подчистить очередь, подчистить треды...


А в чём именно проблема?
Установить всем потокам флаги на остановку. Потоки перед завершением снимают все эелменты из своей очереди и удаляют. Заджоинить потоки. Всё.


1024cores — all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[4]: реализация читатели-писатель
От: OdesitVadim Украина  
Дата: 27.04.10 11:10
Оценка:
Здравствуйте, remark, Вы писали:

R>А в чём именно проблема?

R>Установить всем потокам флаги на остановку. Потоки перед завершением снимают все эелменты из своей очереди и удаляют. Заджоинить потоки. Всё.

R>

Ну так это я тоже понимаю. Потоки висли на этом
request* dequeue()
  {
      boost::unique_lock<boost::mutex> lock (mtx);
      while (queue.empty())
        cv.wait(lock); //<====== 
      request* req = queue.front();
      queue.pop_front();
      return req;
  }

Я при завершении рассылал им broadcast. Но так как очередь пустая, то там и оставалось. пришлось добавить выход с этого цикла по условной переменной.

Сейчас все вроде работает. будет тестировать под нагрузкой.
Re[5]: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 27.04.10 11:16
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

OV>Ну так это я тоже понимаю. Потоки висли на этом

OV>
OV>request* dequeue()
OV>  {
OV>      boost::unique_lock<boost::mutex> lock (mtx);
OV>      while (queue.empty())
OV>        cv.wait(lock); //<====== 
OV>      request* req = queue.front();
OV>      queue.pop_front();
OV>      return req;
OV>  }
OV>


Условная переменная (в отличие от семафора) не несёт никакого состояния, поэтому просто выполнение broadcast() без соотв. модификации состояния не имеет смысла.
Тебе надо класть в очередь сообщение-маркер и потом делать broadcast(). Либо ждать появления сообщений в очереди ИЛИ установки какого-то, устанавливать флаг и делать broadcast().
Просто broadcast() — бессмысленная переменная, можешь считать её на no-op.


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[6]: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 27.04.10 11:17
Оценка:
Здравствуйте, remark, Вы писали:

R>Условная переменная (в отличие от семафора) не несёт никакого состояния, поэтому просто выполнение broadcast() без соотв. модификации состояния не имеет смысла.

R>Тебе надо класть в очередь сообщение-маркер и потом делать broadcast(). Либо ждать появления сообщений в очереди ИЛИ установки какого-то, устанавливать флаг и делать broadcast().

---------------/\/\/\/\/\
какого-то флага


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[5]: реализация читатели-писатель
От: uzhas Ниоткуда  
Дата: 27.04.10 11:51
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

OV>Но так как очередь пустая, то там и оставалось. пришлось добавить выход с этого цикла по условной переменной.


Есть следующий подход : функция dequeue ожидает наступление одного из двух событий:
1. появился новый запрос для выполнения
2. запросили прервать всю работу.
соответственно, код должен быть такой (мета-код):
request* dequeue()
{
  //locks
  while (wait_events(quitEvent, newRequestEvent) == NEW_REQUEST_EVENT)
  {
    return pop_request();
  }
  return 0;
}
Re[6]: реализация читатели-писатель
От: OdesitVadim Украина  
Дата: 27.04.10 12:50
Оценка:
Здравствуйте, remark, Вы писали:

R>Условная переменная (в отличие от семафора) не несёт никакого состояния, поэтому просто выполнение broadcast() без соотв. модификации состояния не имеет смысла.

если я правильно мануалы вычитал, то она просто рассылает сигнал всем, кто ожидает.
R>Тебе надо класть в очередь сообщение-маркер и потом делать broadcast(). Либо ждать появления сообщений в очереди ИЛИ установки какого-то, устанавливать флаг и делать broadcast().
R>Просто broadcast() — бессмысленная переменная, можешь считать её на no-op.
стоп, стоп. какая переменная? это же функция как минимум? и если она рассылает сообщение, то она не такая уже и бессмысленная.

R>
Re[7]: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 27.04.10 12:53
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

R>>Условная переменная (в отличие от семафора) не несёт никакого состояния, поэтому просто выполнение broadcast() без соотв. модификации состояния не имеет смысла.

OV>если я правильно мануалы вычитал, то она просто рассылает сигнал всем, кто ожидает.
R>>Тебе надо класть в очередь сообщение-маркер и потом делать broadcast(). Либо ждать появления сообщений в очереди ИЛИ установки какого-то, устанавливать флаг и делать broadcast().
R>>Просто broadcast() — бессмысленная переменная, можешь считать её на no-op.

OV>стоп, стоп. какая переменная? это же функция как минимум? и если она рассылает сообщение, то она не такая уже и бессмысленная.


Если предикат уже был установлен, то никакие потоки не ждут, и сигнализировать бессмысленно.
Если предикат не установлен, то сигнализировать без изменения состояния предиката тоже бессмысленно, т.к. никакой поток не выйдет из ожидания.
Сигнал на условной переменной — это всегда сигнал об изменении внешнего состояния.


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[6]: реализация читатели-писатель
От: посетитель /life/  
Дата: 27.04.10 13:07
Оценка:
Здравствуйте, uzhas, Вы писали:

U>Есть следующий подход : функция dequeue ожидает наступление одного из двух событий:

U>1. появился новый запрос для выполнения
U>2. запросили прервать всю работу.
U>соответственно, код должен быть такой (мета-код):
U>
U>request* dequeue()
U>{
U>  //locks
U>  while (wait_events(quitEvent, newRequestEvent) == NEW_REQUEST_EVENT)
U>  {
U>    return pop_request();
U>  }
U>  return 0;
U>}
U>

А дедлока не будет? Ожидание происходит внутри lock-секции. При вызове enqueue попытка захвата lock'а.
Re[7]: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 27.04.10 13:10
Оценка:
Здравствуйте, посетитель /life/, Вы писали:

ПL>А дедлока не будет? Ожидание происходит внутри lock-секции. При вызове enqueue попытка захвата lock'а.


Условная переменная отпускает лок на время ожидания.


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[8]: реализация читатели-писатель
От: посетитель /life/  
Дата: 27.04.10 13:13
Оценка:
Здравствуйте, remark, Вы писали:

R>Здравствуйте, посетитель /life/, Вы писали:


ПL>>А дедлока не будет? Ожидание происходит внутри lock-секции. При вызове enqueue попытка захвата lock'а.


R>Условная переменная отпускает лок на время ожидания.


R>


А где в подходе uzhas'а условная переменная? Мне показалось, что предлагается использовать что-то типа WaitForMultipleObjects, нет?
Re[9]: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 27.04.10 13:16
Оценка:
Здравствуйте, посетитель /life/, Вы писали:

ПL>>>А дедлока не будет? Ожидание происходит внутри lock-секции. При вызове enqueue попытка захвата lock'а.


R>>Условная переменная отпускает лок на время ожидания.


ПL>А где в подходе uzhas'а условная переменная? Мне показалось, что предлагается использовать что-то типа WaitForMultipleObjects, нет?


Но там вроде ничего и не лочится, и ничего и не надо лочить...


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[10]: реализация читатели-писатель
От: посетитель /life/  
Дата: 27.04.10 13:41
Оценка:
Здравствуйте, remark, Вы писали:

ПL>>А где в подходе uzhas'а условная переменная? Мне показалось, что предлагается использовать что-то типа WaitForMultipleObjects, нет?


R>Но там вроде ничего и не лочится,


Мне показалось, что выделенный комментарий означает, что лочится:
U>
U>request* dequeue()
U>{
U>  //locks
U>  while (wait_events(quitEvent, newRequestEvent) == NEW_REQUEST_EVENT)
U>  {
U>    return pop_request();
U>  }
U>  return 0;
U>}
U>


R>и ничего и не надо лочить...

Почему не надо? Ведь тогда enqueue и dequeue будут не синхронизированы.
Я так понял, что событие newRequestEvent используется по аналогии с условной переменной — срабатывает при помещении объекта в пустую очередь. Или там другая схема?
Re[11]: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 27.04.10 13:51
Оценка:
Здравствуйте, посетитель /life/, Вы писали:

R>>и ничего и не надо лочить...

ПL>Почему не надо? Ведь тогда enqueue и dequeue будут не синхронизированы.
ПL>Я так понял, что событие newRequestEvent используется по аналогии с условной переменной — срабатывает при помещении объекта в пустую очередь. Или там другая схема?

Предлагаю приостановить обсуждение пока не будет полного кода.


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[8]: реализация читатели-писатель
От: OdesitVadim Украина  
Дата: 28.04.10 06:55
Оценка:
Здравствуйте, remark, Вы писали:


R>Если предикат уже был установлен, то никакие потоки не ждут, и сигнализировать бессмысленно.

R>Если предикат не установлен, то сигнализировать без изменения состояния предиката тоже бессмысленно, т.к. никакой поток не выйдет из ожидания.
R>Сигнал на условной переменной — это всегда сигнал об изменении внешнего состояния.

R>

При таком объяснении, получается, что ввели функцию, которая никому не нужна так как она ничего не делает. Хотя есть маленький кусочек "без изменения состояния"
Мы говорим о разных условных переменных. Я о конкретной реализации в Linux на базе pthread. Вот кусочек описания

pthread_cond_wait() puts the current thread to sleep. It requires a mutex of the associated shared resource value it is waiting on. pthread_cond_signal() signals one thread out of the possibly many sleeping threads to wakeup. pthread_cond_broadcast() signals all threads waiting on the cond condition variable to wakeup. Here is an example on using pthread condition variables:

Откуда получается, что pthread_cond_signal будит один с потоков (произвольный), который ожидает на вызове pthread_cond_wait. А pthread_cond_broadcast — это фактически в рассылка всем тредам, которые висят на pthread_cond_wait сигнала. Меня лично это поведение полностью устраивает.
Re: реализация читатели-писатель
От: artiz  
Дата: 28.04.10 06:57
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

OV>Доброго дня.


OV>Ищется библиотека либо академическая статья, по которой можно написать такое.

OV>есть тред, который читает с сокета и складывает задания в очередь.
OV>Есть какой то пул тредов, которые с очереди забирают, выполяют и отдают результат в основной тред.

OV>Интересует либо реализация под Linux+gcc или хорошая статья с теорией правильной реализации.


Можешь попробовать мой вариант — thread_pool.hpp
Работает под Win/Linux.

Использовать примерно так (код не рабочий, только идея):

void threadProc(int n){
    std::cout << "processed: " << res << std::endl;
}

ThreadPool<int> threadPool(threadProc, 20);

// main work cycle
while (true)
{
    ...
    socket_type clientSock = accept (serverSock, (sockaddr* ) &clientAddr, &clientAddrLen);
    int command = util::readInt (clientSock);
    util::closeSocket (clientSock);

    if (command == EXIT_COMMAND)
       break;

    threadPool.process (command);
}
...
threadPool.stop(true);
...
util::closeSocket (serverSock);
...

Рабочий пример: thread_pool
Re[9]: реализация читатели-писатель
От: remark Россия http://www.1024cores.net/
Дата: 28.04.10 07:08
Оценка:
Здравствуйте, OdesitVadim, Вы писали:

R>>Если предикат уже был установлен, то никакие потоки не ждут, и сигнализировать бессмысленно.

R>>Если предикат не установлен, то сигнализировать без изменения состояния предиката тоже бессмысленно, т.к. никакой поток не выйдет из ожидания.
R>>Сигнал на условной переменной — это всегда сигнал об изменении внешнего состояния.

OV>При таком объяснении, получается, что ввели функцию, которая никому не нужна так как она ничего не делает. Хотя есть маленький кусочек "без изменения состояния"


В этом кусочке и суть.

OV>Мы говорим о разных условных переменных. Я о конкретной реализации в Linux на базе pthread. Вот кусочек описания


OV>

OV>pthread_cond_wait() puts the current thread to sleep. It requires a mutex of the associated shared resource value it is waiting on. pthread_cond_signal() signals one thread out of the possibly many sleeping threads to wakeup. pthread_cond_broadcast() signals all threads waiting on the cond condition variable to wakeup. Here is an example on using pthread condition variables:

OV>Откуда получается, что pthread_cond_signal будит один с потоков (произвольный), который ожидает на вызове pthread_cond_wait. А pthread_cond_broadcast — это фактически в рассылка всем тредам, которые висят на pthread_cond_wait сигнала. Меня лично это поведение полностью устраивает.

Мы говорим об одной и той же условной переменной.
Обрати внимание на кусок "a mutex of the associated shared resource value it is waiting on".
Условная переменная сигнализирует об изменении состояния, если состояние не изменилось, то сигнализирование бессмысленно. Ожидание на условной переменной всегда делается в цикле, который проверяет состояние. При сигнализировании другой поток действительно проснётся, но сразу же увивдит, что состояние не изменилось, и уснёт обратно. Выход нулевой.
Интерфейс POSIX ограничен С, если же ты посмотришь любую нормальную реализацию условных переменных для С++, то они принимают предикат, которого ждём. При использовании такого АПИ, ожидающий поток даже и не выйдёт из фунукции ожидания при сигнализировании без изменения сосотояния предикаита.


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[10]: реализация читатели-писатель
От: OdesitVadim Украина  
Дата: 28.04.10 07:36
Оценка:
Здравствуйте, remark, Вы писали:
[scip]
R>Мы говорим об одной и той же условной переменной.
[scip]
R>Интерфейс POSIX ограничен С,
Но работает так как я написал?
R>если же ты посмотришь любую нормальную реализацию условных переменных для С++, то они принимают предикат, которого ждём. При использовании такого АПИ, ожидающий поток даже и не выйдёт из фунукции ожидания при сигнализировании без изменения сосотояния предикаита.
тоесть буста?

R>
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.