From: Skywing on
Make sure that you don't unwind the stack past the OVERLAPPED structure
though, i.e. don't return from the function where it was declared as a local
variable (if you create it on the stack) until you are certain there are no
pending operations using the OVERLAPPED.

"Tim Roberts" <timr(a)probo.com> wrote in message
news:55ai72lqmcfqnqutf099fm2ehpc9013eda(a)4ax.com...
> Rod <Siegen032005(a)end.of.message> wrote:
>>
>>I know to do it in a separate thread. What I don't
>>understand is how to use either an "overlapped" result,
>
> If you use a separate thread, it is not NECESSARY to use a second thread.
> Just let the call block. That's the beauty of using another thread.
>
> On the other hand, using an overlapped result is not hard. Create an
> OVERLAPPED structure on the stack. Create an event using CreateEvent, and
> shove its handle into the hEvent. Pass the structure to
> ReadDirectoryChangesW. The function will return immediately.
>
> Now you can go do something else, and when you run out of things to do,
> call WaitForSingleEvent on the event in the overlap structure. When it
> finally finishes, use GetOverlappedResult to get the results.
>
> Be sure to open the directory with FILE_FLAG_OVERLAPPED.
> --
> - Tim Roberts, timr(a)probo.com
> Providenza & Boekelheide, Inc.


From: Tim Roberts on
Tim Roberts <timr(a)probo.com> wrote:

>Rod <Siegen032005(a)end.of.message> wrote:
>>
>>I know to do it in a separate thread. What I don't
>>understand is how to use either an "overlapped" result,
>
>If you use a separate thread, it is not NECESSARY to use a second thread.
>Just let the call block. That's the beauty of using another thread.

Hmm. That may be the most confusing sentence I ever wrote. Let me try
again:

If you use a separate thread, it is not NECESSARY to use an overlapped
result. Just let the call block. That's the beauty of using another
thread.
--
- Tim Roberts, timr(a)probo.com
Providenza & Boekelheide, Inc.
From: Rod on
On Sat, 27 May 2006 16:25:22 +1000 Rod <Siegen032005(a)end.of.message> wrote:


Thanks to everyone for their assistance.

Cheers,


__ Rod.
From: ???? on
class P2PFileShare
{
typedef struct
{
OVERLAPPED ov;


BYTE buff[1024];


LPTSTR path;
DWORD flag;


HANDLE handle;
}PATH_OV, *LPPATH_OV;


typedef struct
{
LPTSTR name; // 文件名称
DWORD time; // 通知时间
}FILE_NOTIFY;


public:
P2PFileShare()
: mh_IOCP(NULL)
, mn_OVPtr(0)
, mp_OVPtr(NULL)
, mn_Notify(0)
, mp_Notify(NULL)
{
}


virtual~P2PFileShare()
{
Close(TRUE);
}
private:
// 创建工作线程
HRESULT _CreateWorkerThread();


// 工作线程
#ifndef _WIN32_WCE
static UINT WINAPI _WorkerThreadProc(IN LPVOID pData);
#else
static DWORD WINAPI _WorkerThreadProc(IN LPVOID pData);
#endif // #ifndef _WIN32_WCE
HRESULT _WorkerThreadProc();


public:
HRESULT Start();
VRESULT Close(IN CONST BOOL bWait = FALSE);
public:
// 监视指定目录
HRESULT MonitorPath(IN LPCTSTR sFileName);
// 文件变化通知
LPTSTR GetNotify();
private:
HANDLE mh_IOCP;


MLONG mn_OVPtr;
LPPATH_OV* mp_OVPtr;


MLONG mn_Notify;
FILE_NOTIFY* mp_Notify;
public:
INLINE VRESULT EnterLock() {mo_cs.EnterLock();}
INLINE VRESULT LeaveLock() {mo_cs.LeaveLock();}
private:
MTCSObject mo_cs;



};


// 创建工作线程(根据 CPU
的数量,创建相应数量的工作线程)
HRESULT P2PFileShare::_CreateWorkerThread()
{
HRESULT hr = E_FAIL;

HANDLE hThread;
#ifndef _WIN32_WCE
if((hThread = (HANDLE)_beginthreadex(NULL, 0
, _WorkerThreadProc
, (LPVOID)this, 0,
NULL)) == 0)
{
return _doserrno;
}
#else
if((hThread = (HANDLE)::CreateThread(NULL, 0
, _WorkerThreadProc
, (LPVOID)this, 0,
&NULL)) == 0)
{
return ::GetLastError();
}
#endif
::CloseHandle(hThread); // 关闭句柄避免资源泄漏
hr = S_OK;


return hr;



}


// 工作线程
#ifndef _WIN32_WCE
UINT P2PFileShare::_WorkerThreadProc(IN LPVOID pData)
#else
DWORD P2PFileShare::_WorkerThreadProc(IN LPVOID pData)
#endif // #ifndef _WIN32_WCE
{
((P2PFileShare*)pData)->_WorkerThreadProc();

#ifndef _WIN32_WCE
_endthreadex(0);
#else
ExitThread(0);
#endif
return 0;



}


// 数据处理线程函数
HRESULT P2PFileShare::_WorkerThreadProc()
{
// 注意: 调用 GetQueuedCompletionStatus
的线程都将被放到完成端口的等待线程队列中
// 完成操作循环

BOOL bSucceed;
DWORD dwBytes;


LPDWORD pCT;
PATH_OV* pOV;


for(;;)
{
bSucceed = ::GetQueuedCompletionStatus(mh_IOCP

, &dwBytes

, (LPDWORD)&pCT

, (LPOVERLAPPED*)&pOV

, INFINITE

);
if(bSucceed)
{
if(NULL == pOV) break; //
退出工作线程


FILE_NOTIFY_INFORMATION * pfiNotifyInfo =
(FILE_NOTIFY_INFORMATION*)pOV->buff;


DWORD dwNextEntryOffset;
TCHAR sFileName[1024];


do
{
dwNextEntryOffset =
pfiNotifyInfo->NextEntryOffset;


DWORD dwAction = pfiNotifyInfo->Action;

DWORD dwFileNameLength =
pfiNotifyInfo->FileNameLength;


CPY_W2T(sFileName,
pfiNotifyInfo->FileName,
dwFileNameLength/sizeof(WCHAR));


switch(dwAction)
{
case FILE_ACTION_REMOVED:
// 文件删除
{
LPTSTR sFullName = new
TCHAR[LPTSTRLen(pOV->path) +
LPTSTRLen(sFileName) + 1];
if(NULL != sFullName)
{

LPTSTRCpy(sFullName, pOV->path);

LPTSTRCat(sFullName, sFileName);

LPTSTRPrintf(__T("Del %s\n"), sFullName);


delete[]
sFullName;
}
}
break;
case FILE_ACTION_ADDED:
// 文件替换
{
//
替换文件时只会触发 FILE_ACTION_ADDED,
因此需要手工触发 FILE_ACTION_MODIFIED
LPTSTRPrintf(__T("Add
%s\n"), sFileName);
}
case FILE_ACTION_MODIFIED:
// 文件修改
{
//
测试文件是否关闭
LPTSTR sFullName = new
TCHAR[LPTSTRLen(pOV->path) +
LPTSTRLen(sFileName) + 1];
if(NULL != sFullName)
{

LPTSTRCpy(sFullName, pOV->path);

LPTSTRCat(sFullName, sFileName);
HANDLE hFile =
::CreateFile(sFullName

, GENERIC_WRITE

, FILE_SHARE_WRITE

, NULL

, OPEN_EXISTING

, FILE_ATTRIBUTE_NORMAL

, NULL

);

if(INVALID_HANDLE_VALUE == hFile)
{
HRESULT
hr = ::GetLastError();

LPTSTRPrintf(__T("Locked %s %d\n"), sFileName, hr);
}
else
{

::CloseHandle(hFile);



LPTSTRPrintf(__T("Modify %s\n"), sFileName);


LONG i;


EnterLock();

for(i=0;i<mn_Notify;i++)
{

if(LPTSTRCompare(mp_Notify[i].name, sFullName) == 0)

{

mp_Notify[i].time = ::GetTickCount();

break;

}
}


if(i >=
mn_Notify)
{

FILE_NOTIFY* pNotify = new FILE_NOTIFY[mn_Notify + 1];

if(NULL != pNotify)

{

if(mn_Notify > 0)

{

::CopyMemory(pNotify, mp_Notify,
sizeof(FILE_NOTIFY)*mn_Notify);

delete[] mp_Notify;

}



pNotify[mn_Notify].name = sFullName; sFullName = NULL;

pNotify[mn_Notify].time = ::GetTickCount();



mp_Notify = pNotify;

++mn_Notify;

}
}

LeaveLock();
}
if(NULL !=
sFullName) delete[] sFullName;
}
}
break;
case FILE_ACTION_RENAMED_OLD_NAME:
// 文件改名
{
if(dwNextEntryOffset !=
0)
{
pfiNotifyInfo=
(FILE_NOTIFY_INFORMATION*)((BYTE*)pfiNotifyInfo +
dwNextEntryOffset);
}
dwNextEntryOffset =
pfiNotifyInfo->NextEntryOffset;


DWORD dwAction =
pfiNotifyInfo->Action;
DWORD dwFileNameLength
= pfiNotifyInfo->FileNameLength;


if(dwAction ==
FILE_ACTION_RENAMED_NEW_NAME)
{
TCHAR
sNewName[1024];

CPY_W2T(sNewName, pfiNotifyInfo->FileName,
dwFileNameLength/sizeof(WCHAR));



LPTSTRPrintf(__T("Rename %s -> %s\n"), sFileName, sNewName);
}
else
{
continue;
}
}
break;
}


if(dwNextEntryOffset != 0)
{
pfiNotifyInfo=
(FILE_NOTIFY_INFORMATION*)((BYTE*)pfiNotifyInfo +
dwNextEntryOffset);
}
}while (dwNextEntryOffset != 0);


// 投递目录监视
::ZeroMemory(pOV->buff, 1024);
::ReadDirectoryChangesW( pOV->handle

, pOV->buff

, 1024

, FALSE

, pOV->flag

, NULL

, (OVERLAPPED*)pOV

, NULL

);
}
}


EnterLock();
while(mn_Notify > 0)
{
--mn_Notify;
delete[] mp_Notify[mn_Notify].name;
}
delete[] mp_Notify;
mp_Notify = NULL;


while(mn_OVPtr > 0)
{
::InterlockedDecrement(&mn_OVPtr);
pOV = mp_OVPtr[mn_OVPtr];


::CloseHandle(pOV->handle);
delete[] pOV->path;
delete pOV;
}
delete[] mp_OVPtr;
mp_OVPtr = NULL;


::CloseHandle(mh_IOCP);
mh_IOCP = NULL;


LeaveLock();


return S_OK;



}


HRESULT P2PFileShare::Start()
{
HRESULT hr;

EnterLock();


USP_ASSERT(mh_IOCP == NULL);


// 创建完成端口
mh_IOCP = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
NULL,
0);
if(NULL == mh_IOCP)
{
hr = ::GetLastError();
}
else
{
hr = _CreateWorkerThread();
if(S_OK != hr)
{
::CloseHandle(mh_IOCP);
mh_IOCP = NULL;
}
}


LeaveLock();


return hr;



}


VRESULT P2PFileShare::Close(IN CONST BOOL bWait)
{
::PostQueuedCompletionStatus(mh_IOCP, 0, NULL, NULL); //
通知工作线程关闭

if(bWait)
{
while(mn_OVPtr > 0) ::Sleep(10);
}



}


// 监视共享目录
HRESULT P2PFileShare::MonitorPath(IN LPCTSTR sPath)
{
USP_ASSERT(mh_IOCP != NULL);
if(NULL == mh_IOCP) return E_FAIL;

PATH_OV*pOV = new PATH_OV;
if(NULL == pOV) return E_OUTOFMEMORY;
::ZeroMemory(pOV, sizeof(PATH_OV));


pOV->path = NEW_T2T(sPath);
if(NULL == pOV->path)
{
delete pOV;
}


// 创建目录句柄
pOV->handle = ::CreateFile( pOV->path
,
FILE_LIST_DIRECTORY
,
FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE
, NULL
,
OPEN_EXISTING
,
FILE_FLAG_BACKUP_SEMANTICS|FILE_FLAG_OVERLAPPED
, NULL
);
if(INVALID_HANDLE_VALUE == pOV->handle)
{
delete[] pOV->path;
delete pOV;
return ::GetLastError();
}


// 帮定目录句柄
if(NULL == ::CreateIoCompletionPort(pOV->handle, mh_IOCP, NULL,
0))
{
::CloseHandle(pOV->handle);
delete[] pOV->path;
delete pOV;
return ::GetLastError();
}


// 提交目录监视
pOV->flag = FILE_NOTIFY_CHANGE_LAST_WRITE |
FILE_NOTIFY_CHANGE_FILE_NAME;


BOOL bSucceed = ::ReadDirectoryChangesW( pOV->handle

, pOV->buff

, 1024

, FALSE

, pOV->flag

, NULL

, (OVERLAPPED*)pOV

, NULL

);


if(!bSucceed)
{
::CloseHandle(pOV->handle);
delete[] pOV->path;
delete pOV;
return ::GetLastError();
}


HRESULT hr = S_OK;
LONG i;
EnterLock();
for(i=0;i<mn_OVPtr;i++)
{
if(LPTSTRCompare(mp_OVPtr[i]->path, pOV->path) == 0)
{
hr = S_FALSE;
break;
}
}


if(i >= mn_OVPtr)
{
LPPATH_OV* pOVPtr = new LPPATH_OV[mn_OVPtr + 1];
if(NULL == pOVPtr)
{
hr = E_OUTOFMEMORY;
}
else
{
if(mp_OVPtr != NULL)
{
::CopyMemory(pOVPtr, mp_OVPtr,
sizeof(LPPATH_OV)*mn_OVPtr);
delete[] mp_OVPtr;
}


pOVPtr[mn_OVPtr] = pOV;
mp_OVPtr = pOVPtr;
::InterlockedIncrement(&mn_OVPtr);
}
}
LeaveLock();


if(S_OK != hr)
{
::CloseHandle(pOV->handle);
delete[] pOV->path;
delete pOV;
return hr;
}


return S_OK;



}


// 文件变化通知
LPTSTR P2PFileShare::GetNotify()
{
LPTSTR sFileName = NULL;

DWORD nTime = ::GetTickCount();


EnterLock();


for(LONG i=0;i<mn_Notify;i++)
{
if(nTime - mp_Notify[i].time >= 1*1000)
{
sFileName = mp_Notify[i].name;


if(mn_Notify - i > 1)
{
::CopyMemory(mp_Notify + i, mp_Notify +
i + 1, (mn_Notify - i -
1)*sizeof(FILE_NOTIFY));
}
--mn_Notify;
if(mn_Notify == 0)
{
delete[] mp_Notify;
mp_Notify = NULL;
}
break;
}
}


LeaveLock();


return sFileName;



}


int _tmain(IN INT nArgc, IN LPCTSTR* psArgv)
{
UNREFERENCED_PARAMETER(nArgc);
UNREFERENCED_PARAMETER(psArgv);

#ifdef UNICODE
CRTSetLocale(); // 设置本地化开关,保证在
UniCode
下可以输出汉字
#endif // UNICODE


ConsoleInit();


HRESULT hr;


P2PFileShare oShare;


hr = oShare.Start();


if(S_OK == hr)
{
hr =
oShare.MonitorPath(__T("E:\\EPServer\\bin\\incoming\\"));
hr =
oShare.MonitorPath(__T("E:\\EPServer\\bin\\incomtmp\\"));


AFSP sInput;
for(;;)
{
LPTSTR sFileName = oShare.GetNotify();
if(NULL != sFileName)
{
//
有一个文件已经完全复制完毕
LPTSTRPrintf(__T("Hashing %s\n"),
sFileName);


// ...


delete[] sFileName;
}
::Sleep(1000);


// sInput.Attach(ConsoleGetStringNoEcho());
// if(0 == LPTSTRICompare(sInput, __T("exit")))
break;
// else
// if(0 == LPTSTRICompare(sInput, __T("quit")))
break;
}


oShare.Close();
}


ConsoleTerm();
return S_OK == hr ? 0 : -1;



}