Prev: Posting a functor to a window - strange rare crash.
Next: Declaring a dynamic pointer to an array of char pointers
From: Mihajlo Cvetanović on 28 Jan 2010 13:41 First, you need only one critical section, and its name should relate to the data it guards, not to the entities that use it: CRITICAL_SECTION csMySharedData; Your current solution does not prevent one consumer and one producer to access the same data at the same time. Second, the life of critical section should be the same as the useful life of data it's supposed to guard. You're safe to call DeleteCriticalSection only when you know for sure that no one will access data anymore. This is tricky to implement. You must wait for all spawned threads to finish, and delete CS after that. Third, it makes little sense to enter CS, change one bit of guarded data, leave CS, enter it again, change another bit, leave again. Rule of thumb is this: it's safe to leave critical section only if guarded data is consistent again, like it was when you entered CS. Double push_back must be in the same CS. Same as with double erase. And fourth, never sleep or wait for something inside critical section. You haven't done it, but I have a few times. This rule may complicate the code to circumvent it, but it's essential.
From: Larry on 28 Jan 2010 16:03 "Mihajlo Cvetanović" <mcvetanovic(a)gmail> ha scritto nel messaggio news:ufaFTmEoKHA.1552(a)TK2MSFTNGP04.phx.gbl... > First, you need only one critical section, and its name should relate to > the data it guards, not to the entities that use it: > > CRITICAL_SECTION csMySharedData; > > Your current solution does not prevent one consumer and one producer to > access the same data at the same time. > > Second, the life of critical section should be the same as the useful life > of data it's supposed to guard. You're safe to call DeleteCriticalSection > only when you know for sure that no one will access data anymore. This is > tricky to implement. You must wait for all spawned threads to finish, and > delete CS after that. Perfect! I totally got that now. Also, I have made some changes to the previous code so that it should make more sense (it is working!) /* * * Streaming Server v1.1 by THEARTOFWEB Software * */ #include <iostream> #include <string> #include <map> #include <algorithm> #include <process.h> #include <cstdlib> #include <ctime> #include "socket.h" #include <boost/circular_buffer.hpp> using namespace std; using namespace boost; const string CRLF = "\r\n"; const int numbuff = 3; unsigned int __stdcall Consumer(void* sock); unsigned int __stdcall Producer(void*); void getDateTime(char * szTime); enum buffer_status { BUFF_DONE = 1, BUFF_EMPTY = 0 }; struct buffer { unsigned char data[1024]; int bytesRecorded; int flag; buffer(const unsigned char * data_, const int bytesRecorded_, const int flag_) : bytesRecorded(bytesRecorded_), flag(flag_) { copy(data_, data_ + bytesRecorded_, data); } }; struct circular { circular_buffer<buffer> cb; }; // Global maps map<int, circular> users; CRITICAL_SECTION users_mutex; map<int, HANDLE> eventi; CRITICAL_SECTION eventi_mutex; int main() { // Initialize all critical sections InitializeCriticalSection(&users_mutex); InitializeCriticalSection(&eventi_mutex); // Launch Producer Thread unsigned int prodRet; _beginthreadex(0,0,Producer,NULL,0,&prodRet); if(prodRet) cout << "Launched Producer Thread!" << endl; // Server. // Set up server (port: 8000, maxconn: 10) // SocketServer sockIn(8000, 10); while(1) { // ...wait for incoming connections... Socket* s = sockIn.Accept(); // Spawn a new Consumer Thread each // time a client connects. unsigned int sockRet; _beginthreadex(0,0,Consumer,s,0,&sockRet); if(sockRet) cout << "Spawned a new thread!" << endl; } DeleteCriticalSection(&users_mutex); DeleteCriticalSection(&eventi_mutex); sockIn.Close(); return EXIT_SUCCESS; } // Consumer Thread unsigned int __stdcall Consumer(void* sock) { Socket* s = (Socket*) sock; s->SendBytes("Hello World!" + CRLF); int threadid = (int)GetCurrentThreadId(); // Create Event && Push it in the event map HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL); EnterCriticalSection(&eventi_mutex); eventi.insert(make_pair(threadid,hevent)); LeaveCriticalSection(&eventi_mutex); // Prepare && Add circular buffer to the map circular c; c.cb.set_capacity(numbuff); for(int i = 0; i<numbuff; i++) { c.cb.push_back(buffer(NULL,0,BUFF_EMPTY)); } EnterCriticalSection(&users_mutex); users.insert(make_pair(threadid, c)); LeaveCriticalSection(&users_mutex); // // Read data from the buffer // and send it to the client // // When using push_back the oldest // element in the circular buffer // will be in the index 0 // Sleep(500); while(1) { // CALLBACK EVENT WaitForSingleObject(eventi[threadid], INFINITE); if(users[threadid].cb.at(0).flag == BUFF_DONE) { string line = (char*)users[threadid].cb.at(0).data; int ret = s->SendBytes(line + CRLF); if(SOCKET_ERROR == ret) break; } } // Close & remove event from event map CloseHandle(eventi[threadid]); EnterCriticalSection(&eventi_mutex); eventi.erase(threadid); LeaveCriticalSection(&eventi_mutex); // Remove buffer from the users map EnterCriticalSection(&users_mutex); users.erase(threadid); LeaveCriticalSection(&users_mutex); // Say bye to the client s->SendBytes("Bye bye!" + CRLF); // Disconnect client cout << "Closing thread..." << endl; s->Close(); delete s; return 0; } // Producer Thread unsigned int __stdcall Producer(void*) { while(1) { Sleep(1000); char szTime[30]; getDateTime(szTime); map<int, circular>::iterator uit; EnterCriticalSection(&users_mutex); EnterCriticalSection(&eventi_mutex); for(uit=users.begin(); uit!=users.end(); ++uit) { users[uit->first].cb.push_back(buffer((unsigned char*)szTime, 30, BUFF_DONE)); if(eventi[uit->first]) SetEvent(eventi[uit->first]); cout << "Producer is writing to: " << uit->first << endl; } LeaveCriticalSection(&eventi_mutex); LeaveCriticalSection(&users_mutex); } return 0; } void getDateTime(char * szTime) { time_t rawtime = time(NULL); struct tm timeinfo; gmtime_s(&timeinfo, &rawtime); strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo); } thanks
From: Ulrich Eckhardt on 29 Jan 2010 03:17 Larry wrote: > map<int, circular> users; > CRITICAL_SECTION users_mutex; > > map<int, HANDLE> eventi; > CRITICAL_SECTION eventi_mutex; This looks much better, it makes the association immediately clear. > EnterCriticalSection(&eventi_mutex); > eventi.insert(make_pair(threadid,hevent)); > LeaveCriticalSection(&eventi_mutex); Access to 'eventi' guarded in critical section, correct. BTW: You can also write eventi[threadid] = hevent; > // CALLBACK EVENT > WaitForSingleObject(eventi[threadid], INFINITE); Wait: 1. Access to 'eventi' is not inside a critical section. 2. You could simply use 'hevent'. > if(users[threadid].cb.at(0).flag == BUFF_DONE) > { > string line = (char*)users[threadid].cb.at(0).data; Similarly, access to 'users' is not guarded here! > // Close & remove event from event map > CloseHandle(eventi[threadid]); > > EnterCriticalSection(&eventi_mutex); > eventi.erase(threadid); > LeaveCriticalSection(&eventi_mutex); Imagine the producer creating some content, then the consumer destroys the event object, then the producer tries to signal the event. EnterCriticalSection(&eventi_mutex); eventi.erase(threadid); LeaveCriticalSection(&eventi_mutex); CloseHandle(hevent); > EnterCriticalSection(&users_mutex); > EnterCriticalSection(&eventi_mutex); Here is one possible problem, which is called "deadlock". Imagine one thread locking 'users_mutex' and another locking 'eventi_mutex'. Now, both threads try to lock the other mutex. Neither thread will ever get the lock. Mihajlo mentioned that you shouldn't sleep with a lock held. Actually, you also shouldn't try to acquire another lock with a lock already held! There is one easy fix: Create a single map from thread-ID to the shared structure of both threads. The shared structure contains both the IO buffers and the event to wake up the consumer. The consumer is also responsible for creating the entry when it's ready and removing it when finished. Uli -- C++ FAQ: http://parashift.com/c++-faq-lite Sator Laser GmbH Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
From: Larry on 29 Jan 2010 04:37 "Ulrich Eckhardt" <eckhardt(a)satorlaser.com> ha scritto nel messaggio news:i5ia37-kp1.ln1(a)satorlaser.homedns.org... > This looks much better, it makes the association immediately clear. >> // CALLBACK EVENT >> WaitForSingleObject(eventi[threadid], INFINITE); > > Wait: > 1. Access to 'eventi' is not inside a critical section. > 2. You could simply use 'hevent'. > >> if(users[threadid].cb.at(0).flag == BUFF_DONE) >> { >> string line = (char*)users[threadid].cb.at(0).data; > > Similarly, access to 'users' is not guarded here! I think the following should be a little better... while(1) { // CALLBACK EVENT WaitForSingleObject(hevent, INFINITE); EnterCriticalSection(&users_mutex); int flag = users[threadid].cb.at(0).flag; string line = (char*)users[threadid].cb.at(0).data; LeaveCriticalSection(&users_mutex); if(flag == BUFF_DONE) { int ret = s->SendBytes(line + CRLF); if(SOCKET_ERROR == ret) break; } } > There is one easy fix: Create a single map from thread-ID to the shared > structure of both threads. The shared structure contains both the IO > buffers and the event to wake up the consumer. The consumer is also > responsible for creating the entry when it's ready and removing it when > finished. Ok. I will consider that. thanks
From: Mihajlo Cvetanović on 29 Jan 2010 05:53 One more bug, you need to change the ordering of two erase calls at the end of Consumer function (or couple those two calls inside one and only critical section). Here's explanation: In your case you got lucky in one instance, but you may not be lucky next time. The Producer iterates through users map, and if an element exist then it accesses the corresponding element in eventi map. All fine, because the Consumer creates the element in eventi map before creating the corresponding element in users map. Had it been the other way around (first users.insert then eventi.insert) you would have one more opportunity for a strange and rare crash. Actually "next time" has just come. When Consumer leaves its main loop it erases the element from eventi map and then also from users map. But in between the Producer may just find this element in its loop, feed some data into users map, and call SetEvent with non-existing element! SetEvent may or may not crash, I don't know and would dare not to test it in working environment. And one more thing, you need ResetEvent after Consumer "consumes", otherwise Consumers will never wait, and will consume the same data over and over, until Producer gives them other bone to chew. ResetEvent also must be called inside CS, because it is the part of data consistency. But this is tricky because you must not reset the event if there are data to consume. So after WFSO and inside CS you must ResetEvent only after verifying that there are no more data to consume. And one more problem connected with previous one, a race condition: Producer may produce faster than Consumer can consume (even with Sleep(1000) you are not really safe). The effect: some data is not sent, because the Consumer sends only lastly produced data. The fix: in Consumer erase all consumed data, and consume from the end instead from beginning, and don't ResetEvent if there are more data to consume.
First
|
Prev
|
Next
|
Last
Pages: 1 2 3 4 Prev: Posting a functor to a window - strange rare crash. Next: Declaring a dynamic pointer to an array of char pointers |