Domanda

I've implemented Asynchronous I/O with a callback where I'm worried about concurrency. I contest to you that since I'm always working with the same file and the OS file physical I/O is fundamentally a synchronous operation then I won't need a lock mechanism in my callback method - but I may very well be wrong here - enter SO :o) I have a buffer manager that puts the read data into its buffer cache when the read operation has completed and a state engine for each overlapped operation based on the EOverlappedStates enumeration states ; "I/O not started", "Success", and "Error". Do you think that I need locking in the callback method to ensure concurrency in a multithreaded program like ours?

Open file:

OS_FILE_HANDLE CUniformDiskInterface::OpenFile(const wchar_t *fileName, bool *fileExists, bool readData, bool writeData, bool overlap, 
bool disableDiskCache, bool disableOsCache, bool randomAccess, bool sequentalScan) {
// Set access method
DWORD desiredAccess = readData ? GENERIC_READ : 0;
desiredAccess |= writeData ? GENERIC_WRITE : 0;

// Set file flags
DWORD fileFlags = disableDiskCache ? FILE_FLAG_WRITE_THROUGH : 0;
fileFlags |= disableOsCache ? FILE_FLAG_NO_BUFFERING : 0;
fileFlags |= randomAccess ? FILE_FLAG_RANDOM_ACCESS : 0;
fileFlags |= sequentalScan ? FILE_FLAG_SEQUENTIAL_SCAN : 0;
fileFlags |= !fileFlags ? FILE_ATTRIBUTE_NORMAL : 0;
fileFlags |= overlap ? FILE_FLAG_OVERLAPPED : 0;

HANDLE hOutputFile = CreateFile(
    fileName,
    desiredAccess,
    0,
    NULL,
    OPEN_EXISTING,
    fileFlags,
    NULL);

Read file:

_UINT64 CUniformDiskInterface::ReadFromFile(OS_FILE_HANDLE hFile, void *outData, _UINT64 bytesToRead, OVERLAPPED *overlapped, LPOVERLAPPED_COMPLETION_ROUTINE completionRoutine) {
DWORD wBytesRead = 0;

BOOL result = completionRoutine ? 
    ReadFileEx(hFile, outData, (DWORD)(bytesToRead), overlapped, completionRoutine) : 
    ReadFile(hFile, outData, (DWORD)(bytesToRead), &wBytesRead, overlapped);

if (!result)
{
    int errorCode = GetLastError();
    if (errorCode != ERROR_IO_PENDING )
    {
        wstringstream err(wstringstream::in | wstringstream::out);
        err << L"Can't read sectors from file. [ReadFile] error #" << errorCode << L".";
        throw new FileIOException(L"CUniformDiskInterface", L"ReadFromFile", err.str().c_str(), GETDATE, GETFILE, GETLINE); 
    }
}

return (_UINT64)wBytesRead; }

Extended overlapped structure:

            /*!
        \enum EOverlappedStates
        \brief The different overlapped states
        \details Used as inter-thread communication while waiting for the I/O operation to complete 
        */
        enum EOverlappedStates
        {
            /** The I/O operation has not started or in in-progress */
            EOverlappedNotStarted, 

            /** The I/O operation is done and was successful */
            EOverlappedSuccess, 

            /** The I/O operation is done but there was an error */
            EOverlappedError
        };

        /*!
        \struct OverlappedEx
        \brief Extended overlapped structure
        */
        struct OverlappedEx : OVERLAPPED
        {           
            /** The buffer manager that is designated to cache the record when it's loaded */
            CBufferManager *bufferManger;

            /** Transaction ID related to this disk I/O operation */
            _UINT64 transactionId;

            /** Start disk sector of the record */
            _UINT64 startDiskSector;

            /** Buffer */
            void *buffer;

            /** Number of bytes in \c buffer */
            _UINT64 bufferSize;

            /** Current overlapped I/O state. Used for inter-thread communication while waiting for the I/O to complete */
            EOverlappedStates state;

            /** Error code, or \c 0 if no error */
            _UINT32 errorCode;
        };

Callback method:

    /*! \brief Callback routine after a overlapped read has completed
\details Fills the buffer managers buffer cache with the read data
\todo This callback method may be a bottleneck, so look into how to handle this better
*/
VOID WINAPI CompletedReadRoutine(DWORD dwErr, DWORD cbBytesRead, LPOVERLAPPED lpOverLap) 
{ 
    OverlappedEx *overlapped = (OverlappedEx*)lpOverLap;
    overlapped->errorCode = (_UINT32)dwErr;

    if (!dwErr && cbBytesRead) 
    { 
        overlapped->state = EOverlappedSuccess;
        overlapped->bufferManger->AddBuffer(overlapped->startDiskSector, overlapped->buffer, overlapped->bufferSize);
    }
    else
    {
        // An error occurred
        overlapped->state = EOverlappedError;
    }
} 

Usage:

    _UINT64 startDiskSector = location / sectorByteSize;
void *buffer = bufferManager->GetBuffer(startDiskSector);
if (!buffer)
{
    /*
    The disk sector was not cached, so get the data from the disk and cache in internal memory with
    the buffer manager
    */
    buffer = new char[recordByteSize];

    // Create a overlapped structure to enable disk async I/O operations
    OverlappedEx *overlapped = new OverlappedEx;
    memset(overlapped, 0, sizeof(OverlappedEx));
    overlapped->Offset = (DWORD)(startDiskSector & 0xffffffffULL);
    overlapped->OffsetHigh = (DWORD)(startDiskSector >> 31ULL);
    overlapped->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    overlapped->bufferManger = bufferManager;
    overlapped->startDiskSector = startDiskSector;
    overlapped->buffer = buffer;
    overlapped->bufferSize = recordByteSize;
    overlapped->state = EOverlappedNotStarted;

    // Read from disk
    diskApi.ReadFromFile(fileHandle, buffer, sectorByteSize, overlapped, CompletedReadRoutine);
    return overlapped;
}
È stato utile?

Soluzione

According to the documentation on MSDN, the callback function will only ever be called on the same thread that called the ReadFileEx function and only when the thread is waiting for events to occur. So there are guaranteed no synchronisation issues between the call to ReadFileEx and the invocation of the callback.

This means that there is no need to synchronize access to the OverlappedEx data structure as long as only one thread tries to read into a particular instance of that structure, which is equivalent to reading a particular file only from one thread. If you try to read a single file from multiple thread, it is likely that you encounter problems within Windows itself (I don't think the asynchronous I/O is thread-safe in itself), so locking a mutex won't help you in that case.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top