wiki:bgas-user:named-pipes

CN-ION Communication using Named Pipes

An example for the use of named pipes for communication between BG/Q compute nodes and IO nodes. The code performs creation of two named pipes, namely one for CN-to-ION communication and one for ION-to-CN communication; a ping-pong communication; and cleanup.

The following code is executed on the compute node side:

#include "common.h"

const unsigned int timeout = 30; // seconds

int main() {

        int rc;

        // Note that mkfifo does not work on BG/Q compute nodes; thus, the IO node has to do the mkfifo call.
        // On the CN side, we implement a timeout mechanism for opening the file object representing the pipe,
        // as a single open() call might fail due to the pipe not having been created yet.

        int i;
        int cn_ion_pipefd = -1;
        int ion_cn_pipefd = -1;
        for (i=0; i<timeout; i++) {

                if (cn_ion_pipefd < 0)
                        cn_ion_pipefd = open( cn_ion_pipe_name, O_WRONLY); // this pipe is used for sending data to the ION
                if (ion_cn_pipefd < 0)
                        ion_cn_pipefd = open( ion_cn_pipe_name, O_RDONLY); // this pipe is used for receiving data from the ION
                if ((cn_ion_pipefd >= 0) && (ion_cn_pipefd >= 0))
                        break;
                sleep(1);
        }

        if ((cn_ion_pipefd < 0) || (ion_cn_pipefd < 0)) {
                fprintf( stderr, "Failed to open named pipe.\n");
                return -1;
        }

        char cn_ion_message[message_size];
        memset( cn_ion_message, 0, message_size);
        strcpy( cn_ion_message, "Hello World!");
        ssize_t bytes_written = 0;

        while (bytes_written < message_size) {

                rc = write( cn_ion_pipefd, cn_ion_message+bytes_written, message_size-bytes_written);
                if (rc < 0) {
                        fprintf( stderr, "Failed to write to named pipe.\n");
                        return -1;
                }
                bytes_written += rc;
        }

        char ion_cn_message[message_size];
        memset( ion_cn_message, 0, message_size);
        ssize_t bytes_read = 0;

        while( bytes_read < message_size) {

                rc = read( ion_cn_pipefd, ion_cn_message+bytes_read, message_size-bytes_read);
                if (rc < 0) {
                        fprintf( stderr, "Failed to read from named pipe.\n");
                        return -1;
                }
                bytes_read += rc;
        }

        fprintf( stdout, "Received message from ION via named pipe: '%s'\n", ion_cn_message);
        close( cn_ion_pipefd);
        close( ion_cn_pipefd);
}

while the following code is executed on the IO node side:

#include "common.h"

int main() {

        int rc;

        // Note that mkfifo does not work on BG/Q compute nodes; thus, the IO node has to do the mkfifo call.

        rc = mkfifo( cn_ion_pipe_name, 0666);
        rc |= mkfifo( ion_cn_pipe_name, 0666);

        if (rc) {
                fprintf( stderr, "Failed to create named pipe.\n");
                return -1;
        }

        int cn_ion_pipefd = open( cn_ion_pipe_name, O_RDONLY); // this pipe is used for receiving data from the CN 
        int ion_cn_pipefd = open( ion_cn_pipe_name, O_WRONLY); // this pipe is used for sending data to the CN 

        if ((cn_ion_pipefd < 0) || (ion_cn_pipefd < 0)) {
                fprintf( stderr, "Failed to open named pipe.\n");
                return -1;
        }

        char ion_cn_message[message_size];
        memset( ion_cn_message, 0, message_size);
        ssize_t bytes_read = 0;

        while (bytes_read < message_size) {

                rc = read( cn_ion_pipefd, ion_cn_message+bytes_read, message_size-bytes_read);
                if (rc < 0) {
                        fprintf( stderr, "Failed to read from named pipe.\n");
                        return -1;
                }
                bytes_read += rc;
        }

        char cn_ion_message[message_size];
        memset( cn_ion_message, 0, message_size);
        strcpy( cn_ion_message, "Hello World!");
        ssize_t bytes_written = 0;

        while (bytes_written < message_size) {

                rc = write( ion_cn_pipefd, cn_ion_message+bytes_written, message_size-bytes_written);
                if (rc < 0) {
                        fprintf( stderr, "Failed to write to named pipe.\n");
                        return -1;
                }
                bytes_written += rc;
        }

        close( cn_ion_pipefd);
        close( ion_cn_pipefd);
        unlink( cn_ion_pipe_name);
        unlink( ion_cn_pipe_name);
}

common.h:

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>

const char* cn_ion_pipe_name = "/tmp/cn_ion_pipe";
const char* ion_cn_pipe_name = "/tmp/ion_cn_pipe";

const size_t message_size = 512;

The makefile is straightforward, no non-standard flags are needed. Note that, while the compute node application must be compiled with mpigcc/mpixlc, the IO node application must be compiled with gcc/xlc (i.e. no MPI wrapper). This is no peculiarity of named pipes, but true for compilation on BG/Q machines in general.

The executables are submitted using the following LoadLeveler job script (adapted to the user's needs, of course), with cn.x being the CN application executable, ion.x its ION twin:

# @ job_name = named_pipe_test
# @ output = named_pipe_test.out
# @ error = named_pipe_test.err
# @ notification = error
# @ notify_user = n.vandenbergen@fz-juelich.de
# @ wall_clock_limit = 00:10:00
# @ job_type = bluegene
# @ bg_size = 32 
# @ queue

runjob --np 1 --start-tool /homeb/zam/vandenb/BGAS_named_pipes_example/ion.x : /homeb/zam/vandenb/BGAS_named_pipes_example/cn.x
Last modified 9 years ago Last modified on 01/22/15 13:54:14
Note: See TracWiki for help on using the wiki.