Чтение асинхронно с портами завершения ввода-вывода данных (I/O)


Пример ниже демонстрирует использование ниже перечисленных функций с портами завершения ввода-вывода данных (I/O):

Поскольку этот пример - законченное приложение, оно также демонстрирует и использование нескольких других функций.

Детали примера

Типовой код демонстрирует многопоточное приложение, разработанное, чтобы  читать и анализировать большой файл. Асинхронный ввод - вывод (I/O) используется, чтобы читать порцию файла за один прием из пула рабочих потоков. Завершенный ввод - вывод (I/O) переправляется в порт завершения ввода-вывода данных (I/O). Рабочий поток перехватывает завершенный ввод - вывод (I/O), анализирует его, сообщает о результатах и начинает другую асинхронную операцию чтения. Процесс продолжается до тех пор, пока не будет прочитан весь файл.

Число потоков базируется на числе процессоров в главном компьютере. В особенности, если это число кратно двум. Дополнительные подробности смотри в статье  Порты завершения ввода-вывода данных (I/O).

Затеянный "анализ" - это подсчет числа идентичных последовательных пар байтов в каждом куске программы.

Чтобы проанализировать файл, он задается в командной строке. Пример становится более содержательным, если заданный файл является по величине 1 МБ или больший.

Код состоит из одного потока, запущенного функцией main и нескольких идентичных рабочих потоков запущенных, функцией называемой readAndAnalyzeChunk. Эти функции описаны ниже.

функция main

  1. Выясняет число процессоров, которые являются связанными с параллельными значениями порта завершения ввода-вывода данных (I/O).
  2. Открывает исходный файл, который будет анализироваться для не буферизованного асинхронного ввода - вывода (I/O).
  3. Выясняет размер файла.
  4. Инициализирует указатель чтения, который должен быть доступен рабочим потокам внутри критической секции.
  5. Создает отдельный порт завершения ввода-вывода данных (I/O) в котором завершается чтение записанной единицы данных.

  6. Создает пул рабочих потоков (двойное число процессоров в системе) и начинает каждый из них запускать при помощи readChunkAndAnalyze.
  7. Помещает отдельно взятое "стартовое" завершение ввода-вывода в порт завершения ввода-вывода данных (I/O), который побуждает рабочие потоки начать работу.
  8. Ожидает любой из рабочих потоков, который уходит из программы, показывая, что он проанализировал данные, которые были прочитаны из последней порции файла.
  9. Помещает установки "выход" завершения ввода-вывода в порт завершения ввода-вывода данных (I/O). Это указывает на то, что потоки должны выйти из работы после завершения любого оставшегося анализа.
  10. Ожидает все завершающие работу потоки.
  11. Выводит на экран немного статистики и выходит из программы.

функция readAndAnalyzeChunk

  1. Ожидает завершения ввода - вывода (I/O) в порте завершения ввода-вывода данных (I/O) после создания.
  2. Один из потоков начинает чтение файла тогда, когда основной программой посылается  код "старт".
  3. Анализирует блок данных, возвращенный при помощи входа в критическую секцию, извлекает указатель чтения, модифицирует указатель чтения для следующего потока, покидает критическую секцию.
  4. Выходит из программы, когда  посылается код "выход".

Код примера

[C++]
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <conio.h>

// Назначения для этого примера, в который мы имеем максимум 16 процессоров.
#define MAX_THREADS	32
#define BUFFER_SIZE (64*1024)
DWORD dwNumProcessors;

// Дескриптор анализируемого источникового файла
HANDLE hSourceFile=NULL;

// Дескриптор порта завершения ввода-вывода данных (I/O)
HANDLE hIOCompletionPort=NULL;

// Позиция чтения
ULARGE_INTEGER readPointer;

// Критическая секция
CRITICAL_SECTION critSec;

// Функция ThreadProc
DWORD WINAPI readAndAnalyzeChunk(LPVOID lpParam);

// Коды, которые управляют характером работы потока.
#define KICKOFFKEY  99  // Начало чтения и анализа файла
#define KEY          1  // Чтение следующей порции файла и анализ ее
#define EXITKEY     86  // Выход из работы(файл был прочитан и проанализирован)

// Структурв, отслеживающая ожидающие обработки операции I/O
typedef struct _CHUNK {
    OVERLAPPED overlapped;
    LPVOID buffer;
} CHUNK, *PCHUNK;

//////////////////////////////////////////////////////////////////////
int main(
    int argc,
    char *argv[],
    char *envp)
{
    HANDLE hThread[MAX_THREADS];
    DWORD  dwThreadId,
           dwStatus,
           dwStartTime,
           dwEndTime,
           dwExitStatus = ERROR_SUCCESS;
    ULARGE_INTEGER fileSize,
                   readPointer;
    SYSTEM_INFO systemInfo;
    UINT i;
    BOOL bInit=FALSE;
    OVERLAPPED  kickoffOverlapped,
                dieOverlapped;

    // Убедимся, что источниковый файл определяется в командной строке.

    if (argc != 2) 
    {
        fprintf(stderr, "Используется: %s <source file>\n", argv[0]);
        dwExitStatus=1;
        goto EXIT;
    }

    // Получим число процессоров в системе.
		
    GetSystemInfo(&systemInfo);
    dwNumProcessors = systemInfo.dwNumberOfProcessors;

    // Откроем источниковый файл.

    hSourceFile = CreateFile(argv[1],
                            GENERIC_READ,
                            FILE_SHARE_READ,
                            NULL,
                            OPEN_EXISTING,
                            FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED,
                            NULL);
    if (hSourceFile == INVALID_HANDLE_VALUE) 
    {
        fprintf(stderr, "%s: Открыть не удалось %s, ошибка %d\n",
            argv[0],
            argv[1],
            dwExitStatus = GetLastError());
        goto EXIT;
    }    
    fileSize.LowPart = GetFileSize(hSourceFile, &fileSize.HighPart);

    if ((fileSize.LowPart==0xffffffff) && (GetLastError()!= NO_ERROR))
    {
        fprintf(stderr, "%s: GetFileSize завершилась ошибкой, ошибка %d\n",
            argv[0],
            dwExitStatus = GetLastError());
        goto EXIT;
    }

    // Используем критическую секцию, чтобы преобразовать в последовательную
    // форму доступ к нескольким потокам.
    InitializeCriticalSection(&critSec);
    bInit=TRUE;

    // Создаем порт завершения I/O. 
    hIOCompletionPort = CreateIoCompletionPort(hSourceFile,
                            NULL,  // нет существующих портов завершения I/O
                            KEY,   // принимаемый код в пакете завершения I/O
                            dwNumProcessors); // максимум рабочих потоков

    if (hIOCompletionPort == NULL)
    {
        fprintf(stderr,
                "%s: Порт заверешения I/O создать не удалось (ошибка %d)\n",
                argv[0],
                dwExitStatus = GetLastError());
        goto EXIT;
    }

    // Инициализируем указатель файла на асинхронную структуру
    readPointer.LowPart = readPointer.HighPart = 0;

    dwStartTime = GetTickCount();

    // Запускаем рабочие потоки.
    for (i=0; i<2*dwNumProcessors; i++)
    {
        hThread[i]=CreateThread(NULL,
            0,  // размер стека по умолчанию
            (LPTHREAD_START_ROUTINE) readAndAnalyzeChunk,
            (LPVOID) &fileSize,
            0,  // выполнение немедленно
            &dwThreadId);
        if (hThread[i] == NULL)
        {
            fprintf(stderr, "%s: Поток создать не удалось #%d (ошибка %d)\n",
                argv[0], i, dwExitStatus=GetLastError());
            goto EXIT;
        }
    }
    // Помещаем в очередь начальное событие.
    PostQueuedCompletionStatus(hIOCompletionPort,
        0,
        KICKOFFKEY,
        &kickoffOverlapped);

    // Ждем, когда рабочий поток завершит работу.
    dwStatus = WaitForMultipleObjects(2*dwNumProcessors,
        hThread,
        FALSE,
        INFINITE);
    if (dwStatus == WAIT_FAILED)
    {
        fprintf(stderr, "%s: Ожидание прервано (ошибка %d)\n", argv[0], 
            dwExitStatus=GetLastError());
        goto EXIT;
    }
    // Рабочий поток возвратил значение; отправим сообщение,
    // чтобы поток вышел из программы.
    for (i=0; i<2*dwNumProcessors-1; i++)
    {
        PostQueuedCompletionStatus(hIOCompletionPort,
            0,
            EXITKEY,
            &dieOverlapped);
    }
    // Ждем, когда поток закончит свою работу и завершит всякую деятельность.
    dwStatus = WaitForMultipleObjects(2*dwNumProcessors,
        hThread,
        TRUE,
        INFINITE);

    if (dwStatus == WAIT_FAILED)
    {
        fprintf(stderr, "%s: Ожидание прервано (ошибка %d)\n",
           argv[0], dwExitStatus=GetLastError());
        goto EXIT;
    }
    dwEndTime = GetTickCount();
    printf( "\n\n%d байтов проанализировано за %.3f секунд\n", fileSize.LowPart,
        (float)(dwEndTime-dwStartTime)/1000.0);
    printf( "%.2f Mб/сек\n",
        ((LONGLONG)fileSize.QuadPart/(1024.0*1024.0))/
        (((float)(dwEndTime-dwStartTime))/1000.0));

EXIT:
    (void) _getch();
    if (bInit)
        DeleteCriticalSection(&critSec);
    if (hThread[i])
        CloseHandle(hThread[i]);
    if (hIOCompletionPort)
        CloseHandle(hIOCompletionPort);
    if (hSourceFile)
        CloseHandle(hSourceFile);

    exit(dwExitStatus);
}

//////////////////////////////////////////////////////////////////////
DWORD WINAPI readAndAnalyzeChunk(LPVOID lpParam)
{
    BOOL   bSuccess,
           bMoreToRead;
    DWORD  dwNumBytes,
           dwKey,
           dwSuccess,
           i,
           repeatCnt,
           dwThreadId;
    LPOVERLAPPED completedOverlapped;
    PCHUNK completedChunk;
    CHUNK  chunk;

    printf("Поток (%d) запущен\n", dwThreadId=GetCurrentThreadId());

    chunk.buffer=VirtualAlloc(NULL, BUFFER_SIZE, MEM_COMMIT, PAGE_READWRITE);

    if (chunk.buffer==NULL)
    {
        fprintf(stderr, "Функция VirtualAlloc завершилась ошибкой (ошибка %d)\
                n", GetLastError());
        exit(1);
    }
    
    // Начало асинхронного чтения. Ждем завершения чтения, затем читаем дальше.
    while(1){
    bSuccess=GetQueuedCompletionStatus(hIOCompletionPort,
        &dwNumBytes,
        &dwKey,
        &completedOverlapped,
        INFINITE);
    if (!bSuccess && (completedOverlapped==NULL))
    {
        fprintf(stderr, "GetQueuedCompletionStatus завершилась ошибкой
               (ошибка %d)\n", GetLastError());
        exit(1);
    }
    if (dwKey==EXITKEY)
    {
        VirtualFree((LPVOID) chunk.buffer,
            0,
            MEM_RELEASE);
        ExitThread(0);
    }
    if (!bSuccess)
    {
        fprintf(stderr, "GetQueuedCompletionStatus переместила 
              испорченный пакет I/O (ошибка %d)\n", GetLastError());
        // Не смотря на то, что вы можете здесь завершить работу
        // по ошибке этот пример продолжается.
    }

    if (dwKey != KICKOFFKEY) 
    {
        // Анализируем данные. Анализ в этом примере выявляет количество
        // пар последовательных байтов, которые являются равны друг другу.
        printf("Проанализирована %d часть байтов\n", dwNumBytes);
        completedChunk = (PCHUNK)completedOverlapped;
        repeatCnt = 0;
        for (i = 1; i < dwNumBytes; i++)
            if ((((PBYTE)completedChunk->buffer)[i - 1] ^ 
                 ((PBYTE)completedChunk->buffer)[i]) == 0)
                repeatCnt++;
        printf("Подсчет повторов %d (поток #%d)\n", repeatCnt, dwThreadId);

        // Если число возвращенных байтов меньше, чем BUFFER_SIZE, 
        // которое было прочитано последний раз. Выход потока из программы.
        if (dwNumBytes < BUFFER_SIZE)
            ExitThread(0);
    }
    // Настроим структуру OVERLAPPED для последующего чтения.
    EnterCriticalSection(&critSec);

    if (readPointer.QuadPart < ((ULARGE_INTEGER *)lpParam)->QuadPart) 
    {
        bMoreToRead = TRUE;
        chunk.overlapped.Offset = readPointer.LowPart;
        chunk.overlapped.OffsetHigh = readPointer.HighPart;
        chunk.overlapped.hEvent = NULL;     // не нужен
			
        // Установим указатель позиции в файле для следующего чтения
        readPointer.QuadPart += BUFFER_SIZE;
    }
    else bMoreToRead = FALSE;
				
    LeaveCriticalSection(&critSec);

    // Происходит последующее чтение.

    if (bMoreToRead) 
    {
        dwSuccess = ReadFile(hSourceFile,
            chunk.buffer,
            BUFFER_SIZE,
            &dwNumBytes,
            &chunk.overlapped);
        if (!dwSuccess && (GetLastError() == ERROR_HANDLE_EOF))
            printf( "Конец файла\n" );
        if (!dwSuccess && (GetLastError() != ERROR_IO_PENDING))
            fprintf (stderr, "ReadFile при %lx завершилась ошибкой (ошибка %d)
                     \n",
                chunk.overlapped.Offset,
                GetLastError());
    }
    } // пока конец
}

Назад в оглавление темы
На главную страницу темы

Hosted by uCoz