From: Ulrich Eckhardt on
Larry wrote:
> "Ulrich Eckhardt" <eckhardt(a)satorlaser.com> ha scritto nel messaggio
> news:k4r537-krs.ln1(a)satorlaser.homedns.org...
>
>> Which error exactly?
>
> Expression: map/set iterator not incrementable

This can have three causes:
0. You are simply walking off the end of the container with an iterator.
1. You are accessing the container from different threads. This can lead to
all kinds of corruption, and getting a clear error description like the
above is luck, but not impossible.
2. You have a shared container and iterator into the container. When
removing an element, the iterator becomes invalid if it points to that
element. Forgetting to reset it and then using it is what can cause this
problem.


> by the way, is it true that under windows system I should shy away form
> using callback event rather I should be using boost::thread?

Regardless of the programming environment, you should make an informed
decision. This statement above could only be the result of weighing
different aspects of each approach against each other, but without those
the statement can neither be called true nor false.

That said, you can do a few things with directly using the win32 API that
you can't do with Boost.Thread, precisely because those things are not
portable. Most notably WaitForMultipleObjects() is not portable but very
useful.

Uli

--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
From: Larry on
"Scott McPhillips [MVP]" <org-dot-mvps-at-scottmcp> ha scritto nel messaggio
news:uxenI48nKHA.4628(a)TK2MSFTNGP06.phx.gbl...

> No, use CRITICAL_SECTION. It has less overhead.
>
> After initializing it (InitializeCriticalSection(&cs)), surround all
> accesses to data shared between threads like this:
>
> EnterCriticalSection(&cs);
> ...access or change shared data
> LeaveCriticalSection(&cs);
>
> The first statement suspends the calling thread if another thread is
> "inside" a similar code block. When the other thread does the Leave..,.
> call the first thread is allowed to proceed.

Ok! I am going to need a little help from you though...here's a typical
scenario of my app:

// Buffer structs "circular"

struct buffer
{
unsigned char data[1024]; // binary data container
int bytesRecorded; // bytes actually written (to read)
int flag; // Empty/filled (enum)
buffer(...); // constructor()
};

struct circular
{
circular_buffer<buffer> cb; // Cicular buffer based on "buffer"
};

// Global maps

map<int, circular> users;
map<int, circular>::iterator uit;
map<int, HANDLE> eventi;

int main()
{
// TODO: launch Producer thread.
(...)

// TODO: Set up server and accept incoming
// requests. Each time a client connects
// spawn a new Consumer thread
(...)

return 0; // never for the moment!
}

// Thread Consumer is launched
// anytime a client conencts.

thread Consumer
{
int threadid = (int)GetCurrentThreadId();

// Create event & push it in the "eventi" map
//
HANDLE hevent = CreateEvent(NULL,0,0,NULL);
eventi.insert(make_pair(threadid, hevent));

// Prepare & add circular buffer to the "users" map
//
circular c;
users.insert(make_pair(threadid, c));

// Loop...
//
while(1)
{
// Callback event
WaitForSingleObject(eventi[threadid], INFINITE);

// TODO: read buffer from "users" map
// and send it to the client. If the
// client disconnect break the while loop
}

// Close & remove event from event map
//
CloseHandle(eventi[threadid]);
eventi.erase(threadid);

// Remove buffer from the map
//
users.erase(threadid);

return 0;
}

// Thread Producer is launched
// once. Its main goal is to write
// the same data to all the Consumer
// threads.

thread Producer
{
while(1)
{
// TODO: write on every
// element of map "users"
//
for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer(...));

// Signal event in the Cosumer
// thread (buffer written).
//
if(eventi[uit->first])
SetEvent(eventi[uit->first]);
}
}
return 0;
}

thanks

From: Larry on
"Larry" <dontmewithme(a)got.it> ha scritto nel messaggio
news:4b619efe$0$1101$4fafbaef(a)reader4.news.tin.it...

> Ok! I am going to need a little help from you though...here's a typical
> scenario of my app:

wow...I am getting a lot of unhandle exception (writing location)


/*
*
* Streaming Server v1.1 by THEARTOFWEB Software
*
*/

#include <iostream>
#include <string>
#include <map>
#include <algorithm>
#include <process.h>
#include <cstdlib>
#include <ctime>
#include "socket.h"
#include <boost/circular_buffer.hpp>
using namespace std;
using namespace boost;

const string CRLF = "\r\n";
const int numbuff = 3;

unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);

void getDateTime(char * szTime);

enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};

struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int
flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};

struct circular
{
circular_buffer<buffer> cb;
};

// Global maps

map<int, circular> users;
map<int, circular>::iterator uit;
map<int, HANDLE> eventi;
map<int, HANDLE>::iterator eit;

// Declare Procuder && Cosumer CS

CRITICAL_SECTION csProducer;
CRITICAL_SECTION csConsumer;

int main()
{
// Initialize the critical section
InitializeCriticalSection(&csProducer);

// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;

// Release resources used by the critical section object.
DeleteCriticalSection(&csProducer);

// Server.
// Set up server (port: 8000, maxconn: 10)
//
SocketServer sockIn(8000, 10);

while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();

// Initialize the critical section
InitializeCriticalSection(&csConsumer);

// Spawn a new Consumr Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;

// Release resources used by the critical section object.
DeleteCriticalSection(&csConsumer);
}

sockIn.Close();

return EXIT_SUCCESS;
}

// Consumer Thread
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;

s->SendBytes("Hello World!" + CRLF);

int threadid = (int)GetCurrentThreadId();

// Create Event && Push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);
eventi.insert(make_pair(threadid,hevent));

// Prepare && Add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);

for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}

users.insert(make_pair(threadid, c));

//
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//

Sleep(500);

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);

if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}

// Close & remove event from event map
CloseHandle(eventi[threadid]);

// Request ownership of the critical section.
EnterCriticalSection(&csConsumer);

eventi.erase(threadid);

// Release ownership of the critical section.
LeaveCriticalSection(&csConsumer);

// Remove buffer from the map
users.erase(threadid);

// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);

// Disconnect client
cout << "Closing thread..." << endl;
s->Close();
delete s;
return 0;
}

// Producer Thread
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);

for(uit=users.begin(); uit!=users.end(); ++uit)
{
// Request ownership of the critical section.
EnterCriticalSection(&csProducer);

users[uit->first].cb.push_back(buffer((unsigned char*)szTime,
30, BUFF_DONE));

if(eventi[uit->first])
SetEvent(eventi[uit->first]);

// Release ownership of the critical section.
LeaveCriticalSection(&csProducer);

cout << "Producer is writing to: " << uit->first << endl;
}

}
return 0;
}

void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}

From: Ulrich Eckhardt on
Larry wrote:
> // Declare Procuder && Cosumer CS
>
> CRITICAL_SECTION csProducer;
> CRITICAL_SECTION csConsumer;

Okay. Nice. Now, the 1M$ question is: What data structures are those
critical sections supposed to guard? Defining this is crucial, because
whether you use them correctly depends on exactly that.

> // Initialize the critical section
> InitializeCriticalSection(&csProducer);
>
> // Launch Producer Thread
> unsigned int prodRet;
> _beginthreadex(0,0,Producer,NULL,0,&prodRet);
> if(prodRet)
> cout << "Launched Producer Thread!" << endl;
>
> // Release resources used by the critical section object.
> DeleteCriticalSection(&csProducer);

Wrapped in init/delete calls.

> // Initialize the critical section
> InitializeCriticalSection(&csConsumer);
>
> // Spawn a new Consumr Thread each
> // time a client connects.
> unsigned int sockRet;
> _beginthreadex(0,0,Consumer,s,0,&sockRet);
> if(sockRet)
> cout << "Spawned a new thread!" << endl;
>
> // Release resources used by the critical section object.
> DeleteCriticalSection(&csConsumer);

Wrapped in init/delete calls again, this time for the other CS.

Sorry, but both wraps are wrong. You have to initialise the critical
sections on startup and delete them when you don't need them any more (here
before main() returns). In between, you can use them, e.g. you have to lock
one (EnterCriticalSection) whenever you want to access (read or write) the
shared data it guards. Then, when you're done, you unlock it again
(LeaveCriticalSection), so other threads can access the data.

Uli

--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
From: Larry on
"Ulrich Eckhardt" <eckhardt(a)satorlaser.com> ha scritto nel messaggio
news:5bu837-5f.ln1(a)satorlaser.homedns.org...

> Sorry, but both wraps are wrong. You have to initialise the critical
> sections on startup and delete them when you don't need them any more
> (here
> before main() returns). In between, you can use them, e.g. you have to
> lock
> one (EnterCriticalSection) whenever you want to access (read or write) the
> shared data it guards. Then, when you're done, you unlock it again
> (LeaveCriticalSection), so other threads can access the data.

I tried that, but it is not working like I was expecting...I still get a run
time error when I disconnect from the server (I'm using telnet)

/*
*
* Streaming Server v1.1 by THEARTOFWEB Software
*
*/

#include <iostream>
#include <string>
#include <map>
#include <algorithm>
#include <process.h>
#include <cstdlib>
#include <ctime>
#include "socket.h"
#include <boost/circular_buffer.hpp>
using namespace std;
using namespace boost;

const string CRLF = "\r\n";
const int numbuff = 3;

unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);

void getDateTime(char * szTime);

enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};

struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int
flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};

struct circular
{
circular_buffer<buffer> cb;
};

// Global maps

map<int, circular> users;
map<int, circular>::iterator uit;
map<int, HANDLE> eventi;
map<int, HANDLE>::iterator eit;

// Declare Procuder && Cosumer CS

CRITICAL_SECTION csProducer;
CRITICAL_SECTION csConsumer;

int main()
{
// Initialize the critical section
InitializeCriticalSection(&csProducer);

// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;

// Server.
// Set up server (port: 8000, maxconn: 10)
//
SocketServer sockIn(8000, 10);

while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();

// Initialize the critical section
InitializeCriticalSection(&csConsumer);

// Spawn a new Consumr Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;

}

sockIn.Close();

return EXIT_SUCCESS;
}

// Consumer Thread
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;

s->SendBytes("Hello World!" + CRLF);

int threadid = (int)GetCurrentThreadId();

// Create Event && Push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);

EnterCriticalSection(&csConsumer);
eventi.insert(make_pair(threadid,hevent));
LeaveCriticalSection(&csConsumer);

// Prepare && Add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);

for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}

EnterCriticalSection(&csConsumer);
users.insert(make_pair(threadid, c));
LeaveCriticalSection(&csConsumer);

//
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//

Sleep(500);

while(1)
{
// CALLBACK EVENT
if(eventi[threadid])
{
WaitForSingleObject(eventi[threadid], INFINITE);
} else {
DeleteCriticalSection(&csConsumer);
return 0;
}

EnterCriticalSection(&csConsumer);
if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
LeaveCriticalSection(&csConsumer);
}

// Close & remove event from event map
CloseHandle(eventi[threadid]);

EnterCriticalSection(&csConsumer);
eventi.erase(threadid);
LeaveCriticalSection(&csConsumer);

// Remove buffer from the map
EnterCriticalSection(&csConsumer);
users.erase(threadid);
LeaveCriticalSection(&csConsumer);

// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);

// Disconnect client
cout << "Closing thread..." << endl;

// Release resources used by the critical section object.
DeleteCriticalSection(&csConsumer);
s->Close();
delete s;
return 0;
}

// Producer Thread
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);

// Request ownership of the critical section.
EnterCriticalSection(&csProducer);

for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer((unsigned char*)szTime, 30,
BUFF_DONE));

if(eventi[uit->first])
SetEvent(eventi[uit->first]);

cout << "Producer is writing to: " << uit->first << endl;
}

// Release ownership of the critical section.
LeaveCriticalSection(&csProducer);
}
// Release resources used by the critical section object.
DeleteCriticalSection(&csProducer);
return 0;
}

void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}