From: Mihajlo Cvetanović on
First, you need only one critical section, and its name should relate to
the data it guards, not to the entities that use it:

CRITICAL_SECTION csMySharedData;

Your current solution does not prevent one consumer and one producer to
access the same data at the same time.

Second, the life of critical section should be the same as the useful
life of data it's supposed to guard. You're safe to call
DeleteCriticalSection only when you know for sure that no one will
access data anymore. This is tricky to implement. You must wait for all
spawned threads to finish, and delete CS after that.

Third, it makes little sense to enter CS, change one bit of guarded
data, leave CS, enter it again, change another bit, leave again. Rule of
thumb is this: it's safe to leave critical section only if guarded data
is consistent again, like it was when you entered CS. Double push_back
must be in the same CS. Same as with double erase.

And fourth, never sleep or wait for something inside critical section.
You haven't done it, but I have a few times. This rule may complicate
the code to circumvent it, but it's essential.
From: Larry on

"Mihajlo Cvetanović" <mcvetanovic(a)gmail> ha scritto nel messaggio
news:ufaFTmEoKHA.1552(a)TK2MSFTNGP04.phx.gbl...
> First, you need only one critical section, and its name should relate to
> the data it guards, not to the entities that use it:
>
> CRITICAL_SECTION csMySharedData;
>
> Your current solution does not prevent one consumer and one producer to
> access the same data at the same time.
>
> Second, the life of critical section should be the same as the useful life
> of data it's supposed to guard. You're safe to call DeleteCriticalSection
> only when you know for sure that no one will access data anymore. This is
> tricky to implement. You must wait for all spawned threads to finish, and
> delete CS after that.

Perfect! I totally got that now. Also, I have made some changes to the
previous code so that it should make more sense (it is working!)

/*
*
* Streaming Server v1.1 by THEARTOFWEB Software
*
*/

#include <iostream>
#include <string>
#include <map>
#include <algorithm>
#include <process.h>
#include <cstdlib>
#include <ctime>
#include "socket.h"
#include <boost/circular_buffer.hpp>
using namespace std;
using namespace boost;

const string CRLF = "\r\n";
const int numbuff = 3;

unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);

void getDateTime(char * szTime);

enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};

struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int
flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};

struct circular
{
circular_buffer<buffer> cb;
};

// Global maps

map<int, circular> users;
CRITICAL_SECTION users_mutex;

map<int, HANDLE> eventi;
CRITICAL_SECTION eventi_mutex;

int main()
{
// Initialize all critical sections
InitializeCriticalSection(&users_mutex);
InitializeCriticalSection(&eventi_mutex);

// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;

// Server.
// Set up server (port: 8000, maxconn: 10)
//
SocketServer sockIn(8000, 10);

while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();

// Spawn a new Consumer Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;

}

DeleteCriticalSection(&users_mutex);
DeleteCriticalSection(&eventi_mutex);

sockIn.Close();

return EXIT_SUCCESS;
}

// Consumer Thread
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;

s->SendBytes("Hello World!" + CRLF);

int threadid = (int)GetCurrentThreadId();

// Create Event && Push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);

EnterCriticalSection(&eventi_mutex);
eventi.insert(make_pair(threadid,hevent));
LeaveCriticalSection(&eventi_mutex);

// Prepare && Add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);

for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}

EnterCriticalSection(&users_mutex);
users.insert(make_pair(threadid, c));
LeaveCriticalSection(&users_mutex);

//
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//

Sleep(500);

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);

if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}

// Close & remove event from event map
CloseHandle(eventi[threadid]);

EnterCriticalSection(&eventi_mutex);
eventi.erase(threadid);
LeaveCriticalSection(&eventi_mutex);

// Remove buffer from the users map
EnterCriticalSection(&users_mutex);
users.erase(threadid);
LeaveCriticalSection(&users_mutex);

// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);

// Disconnect client
cout << "Closing thread..." << endl;

s->Close();
delete s;
return 0;
}

// Producer Thread
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);
map<int, circular>::iterator uit;

EnterCriticalSection(&users_mutex);
EnterCriticalSection(&eventi_mutex);

for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer((unsigned char*)szTime, 30,
BUFF_DONE));

if(eventi[uit->first])
SetEvent(eventi[uit->first]);

cout << "Producer is writing to: " << uit->first << endl;
}

LeaveCriticalSection(&eventi_mutex);
LeaveCriticalSection(&users_mutex);
}
return 0;
}

void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}

thanks

From: Ulrich Eckhardt on
Larry wrote:
> map<int, circular> users;
> CRITICAL_SECTION users_mutex;
>
> map<int, HANDLE> eventi;
> CRITICAL_SECTION eventi_mutex;

This looks much better, it makes the association immediately clear.

> EnterCriticalSection(&eventi_mutex);
> eventi.insert(make_pair(threadid,hevent));
> LeaveCriticalSection(&eventi_mutex);

Access to 'eventi' guarded in critical section, correct.

BTW: You can also write

eventi[threadid] = hevent;


> // CALLBACK EVENT
> WaitForSingleObject(eventi[threadid], INFINITE);

Wait:
1. Access to 'eventi' is not inside a critical section.
2. You could simply use 'hevent'.

> if(users[threadid].cb.at(0).flag == BUFF_DONE)
> {
> string line = (char*)users[threadid].cb.at(0).data;

Similarly, access to 'users' is not guarded here!

> // Close & remove event from event map
> CloseHandle(eventi[threadid]);
>
> EnterCriticalSection(&eventi_mutex);
> eventi.erase(threadid);
> LeaveCriticalSection(&eventi_mutex);

Imagine the producer creating some content, then the consumer destroys the
event object, then the producer tries to signal the event.

EnterCriticalSection(&eventi_mutex);
eventi.erase(threadid);
LeaveCriticalSection(&eventi_mutex);
CloseHandle(hevent);


> EnterCriticalSection(&users_mutex);
> EnterCriticalSection(&eventi_mutex);

Here is one possible problem, which is called "deadlock". Imagine one thread
locking 'users_mutex' and another locking 'eventi_mutex'. Now, both threads
try to lock the other mutex. Neither thread will ever get the lock. Mihajlo
mentioned that you shouldn't sleep with a lock held. Actually, you also
shouldn't try to acquire another lock with a lock already held!

There is one easy fix: Create a single map from thread-ID to the shared
structure of both threads. The shared structure contains both the IO
buffers and the event to wake up the consumer. The consumer is also
responsible for creating the entry when it's ready and removing it when
finished.


Uli


--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
From: Larry on
"Ulrich Eckhardt" <eckhardt(a)satorlaser.com> ha scritto nel messaggio
news:i5ia37-kp1.ln1(a)satorlaser.homedns.org...

> This looks much better, it makes the association immediately clear.

>> // CALLBACK EVENT
>> WaitForSingleObject(eventi[threadid], INFINITE);
>
> Wait:
> 1. Access to 'eventi' is not inside a critical section.
> 2. You could simply use 'hevent'.
>
>> if(users[threadid].cb.at(0).flag == BUFF_DONE)
>> {
>> string line = (char*)users[threadid].cb.at(0).data;
>
> Similarly, access to 'users' is not guarded here!

I think the following should be a little better...

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(hevent, INFINITE);

EnterCriticalSection(&users_mutex);
int flag = users[threadid].cb.at(0).flag;
string line = (char*)users[threadid].cb.at(0).data;
LeaveCriticalSection(&users_mutex);

if(flag == BUFF_DONE)
{
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}

> There is one easy fix: Create a single map from thread-ID to the shared
> structure of both threads. The shared structure contains both the IO
> buffers and the event to wake up the consumer. The consumer is also
> responsible for creating the entry when it's ready and removing it when
> finished.

Ok. I will consider that.

thanks

From: Mihajlo Cvetanović on
One more bug, you need to change the ordering of two erase calls at the
end of Consumer function (or couple those two calls inside one and only
critical section). Here's explanation:

In your case you got lucky in one instance, but you may not be lucky
next time. The Producer iterates through users map, and if an element
exist then it accesses the corresponding element in eventi map. All
fine, because the Consumer creates the element in eventi map before
creating the corresponding element in users map. Had it been the other
way around (first users.insert then eventi.insert) you would have one
more opportunity for a strange and rare crash.

Actually "next time" has just come. When Consumer leaves its main loop
it erases the element from eventi map and then also from users map. But
in between the Producer may just find this element in its loop, feed
some data into users map, and call SetEvent with non-existing element!
SetEvent may or may not crash, I don't know and would dare not to test
it in working environment.

And one more thing, you need ResetEvent after Consumer "consumes",
otherwise Consumers will never wait, and will consume the same data over
and over, until Producer gives them other bone to chew. ResetEvent also
must be called inside CS, because it is the part of data consistency.
But this is tricky because you must not reset the event if there are
data to consume. So after WFSO and inside CS you must ResetEvent only
after verifying that there are no more data to consume.

And one more problem connected with previous one, a race condition:
Producer may produce faster than Consumer can consume (even with
Sleep(1000) you are not really safe). The effect: some data is not sent,
because the Consumer sends only lastly produced data. The fix: in
Consumer erase all consumed data, and consume from the end instead from
beginning, and don't ResetEvent if there are more data to consume.