Linux System V - 消息队列与责任链模式

news/2025/2/21 5:24:37

概念

消息队列是一种以消息为单位的进程间通信机制,允许一个或多个进程向队列中发送消息,同时允许一个或多个进程从队列中接收消息。消息队列由内核维护,具有以下特点:

  • 异步通信:发送方和接收方不需要同时运行,消息会存储在队列中。

  • 消息优先级:支持消息的优先级排序。

  • 消息分类:每条消息都有一个类型,可以根据类型选择性地接收。

  • 持久性:即使发送方或接收方意外退出,消息仍然保留在队列中。

内核结构

在 /usr/include/linux/msg.h 中,可以查看到消息队列的结构如下:

struct msqid_ds {
struct ipc_perm msg_perm;
struct msg msg_first; / first message on queue,unused */
struct msg msg_last; / last message in queue,unused */
__kernel_time_t msg_stime; /* last msgsnd time */
__kernel_time_t msg_rtime; /* last msgrcv time */
__kernel_time_t msg_ctime; /* last change time */
unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
unsigned long msg_lqbytes; /* ditto */
unsigned short msg_cbytes; /* current number of bytes on queue */
unsigned short msg_qnum; /* number of messages in queue */
unsigned short msg_qbytes; /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};

其中 struct ipc_perm 是 System V IPC(进程间通信)机制中用于描述 IPC 对象(如消息队列、信号量集或共享内存段)权限和所有者信息的数据结构。它定义了谁可以访问和操作这些 IPC 资源,以及它们的访问权限,其中该结构体位于 /usr/include/linux/ipc.h 内,定义如下:

struct ipc_perm {
key_t __key; /* Key supplied to xxxget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};

接口说明

以下函数均包含在头文件 <sys/ipc.h> 和 <sys/msg.h> 中

msgget

功能:调用创建或获取一个消息队列

int msgget(key_t key, int msgflg);
  • key:唯一标识消息队列的键值,通常通过 ftok 函数生成。

我们可以看到 key 的类型为 key_t,实际上也是一个整数,不过要保证其数值的唯一性,要获取该参数 key 我们需要使用 ftok 函数,它会根据一个文件路径和一个项目标识符(proj_id)通过一系列算法生成一个几乎唯一的键值。

  • pathname 是一个指向存在的文件路径的指针,实际上可以随便写。
  • proj_id 是一个整数,通常是一个字符常量,其低8位被用于生成键值,实际上可以随便写。

  • msgflg:指定消息队列的权限和行为,由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的。

其中最为常用的shmflg如下:

  • IPC_CREAT:如果共享内存段不存在,则创建一个新的共享内存段,存在则返回 shmid。
  • IPC_EXCL:与IPC_CREAT一起使用时,确保创建新的共享内存段,如果共享内存段已存在则失败。
  • 权限位:shmflg的低9位用于设置共享内存段的权限,与文件系统的权限位相同。
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
using namespace std;
int main()
{
    key_t key = ftok("/home/lbk/lesson27/test.cc", 8888);
    int msgid = msgget(key, IPC_CREAT);
    return 0;
}

msgsnd

功能:用于向消息队列中发送消息。

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数
  • msgid:消息队列的 ID(由 msgget() 返回)。

  • msgp:指向消息结构的指针。消息结构的第一个字段必须是 long 类型,表示消息类型,如下。

struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
// 以⼀个long int⻓整数开始,接收者函数将利⽤⽤这个⻓整数确定消息的类型
  • msgsz:消息的大小(不包括消息类型字段)。

  • msgflg:标志位,常用值:

    • 0:阻塞模式,直到消息被发送。

    • IPC_NOWAIT:非阻塞模式,如果队列已满,立即返回。

返回值
  • 成功时返回 0

  • 失败时返回 -1 并设置 errno

#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
using namespace std;
struct msg_buffer
{
    long msg_type;
    char *msg_text;
};
int main()
{
    key_t key = ftok("/home/lbk/lesson27/test.cc", 8888);
    int msgid = msgget(key, IPC_CREAT);

    msg_buffer sendbuf = {1, "hello world!"};
    msgsnd(msgid, &sendbuf, sizeof(sendbuf.msg_text), 0);
    return 0;
}

msgrcv

功能:用于从消息队列中接收消息。

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数
  • msgid:消息队列的 ID。

  • msgp:指向消息结构的指针。

  • msgsz:消息的最大接收大小(不包括消息类型字段)。

  • msgtyp:指定接收的消息类型:

    • 0:接收队列中第一个消息。

    • >0:接收指定类型的消息。

    • <0:接收类型小于等于该值的最小类型消息。

  • msgflg:标志位:

    • 0:阻塞模式,直到消息可用。

    • IPC_NOWAIT:非阻塞模式,如果队列为空,立即返回。

返回值
  • 成功时返回实际接收的消息大小。

  • 失败时返回 -1 并设置 errno

#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
using namespace std;
struct msg_buffer
{
    long msg_type;
    char *msg_text;
};
int main()
{
    key_t key = ftok("/home/lbk/lesson27/test.cc", 8888);
    int msgid = msgget(key, IPC_CREAT);

    msg_buffer sendbuf = {1, "hello world!"};
    msgsnd(msgid, &sendbuf, sizeof(sendbuf.msg_text), 0);

    msg_buffer receivebuf;
    msgrcv(msgid, receivebuf.msg_text, 1024, 2, 0);
    cout << receivebuf.msg_text << endl;
    return 0;
}

msgctl

功能:用于对消息队列进行控制操作,如获取状态、设置属性或删除队列。

int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数
  • msgid:消息队列的 ID。

  • cmd:指定操作命令:

    • IPC_STAT:获取消息队列的状态信息,存储到 buf 中。

    • IPC_SET:设置消息队列的权限和属性。

    • IPC_RMID:删除消息队列。

  • buf:用于存储或修改消息队列的状态信息。

返回值
  • 成功时返回 0

  • 失败时返回 -1 并设置 errno

#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
using namespace std;
struct msg_buffer
{
    long msg_type;
    char *msg_text;
};
int main()
{
    key_t key = ftok("/home/lbk/lesson27/test.cc", 8888);
    int msgid = msgget(key, IPC_CREAT);

    msg_buffer sendbuf = {1, "hello world!"};
    msgsnd(msgid, &sendbuf, sizeof(sendbuf.msg_text), 0);

    msg_buffer receivebuf;
    msgrcv(msgid, receivebuf.msg_text, 1024, 2, 0);
    cout << receivebuf.msg_text << endl;

    // 删除消息队列
    if (receivebuf.msg_text == "quit")
    {
        msgctl(msgid, IPC_RMID, nullptr) == -1;
    }
    return 0;
}

基本通信示例

MessageQueue.hpp

#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
using namespace std;

#define SIZE 1024
#define PATH "/home/lbk/lesson27/MessageQueue.hpp"
#define PROJETID 6666
#define CREATE_MESSAGEQUEUE (IPC_CREAT | IPC_EXCL | 0666)
#define GET_MESSAGEQUEUE (IPC_CREAT)
#define SERVERTYPE 1
#define CLIENTTYPE 2
enum
{
    KEY_ERROR = 1,
    MSGGET_ERRO,
};
struct msg_buffer
{
    long msg_type;
    char msg_text[SIZE];
};
class MessageQueue
{
public:
    void Create(int flag)
    {
        key_t key = ftok(PATH, PROJETID);
        if (key < 0)
        {
            cerr << "ftok error" << endl;
            exit(KEY_ERROR);
        }
        _msgid = msgget(key, flag);
        if (_msgid < 0)
        {
            cerr << "msgget error" << endl;
            exit(MSGGET_ERRO);
        }
    }
    bool SendMessage(const string &in, long type)
    {
        msg_buffer buf;
        buf.msg_type = type;
        strcpy(buf.msg_text, in.c_str());
        int n = msgsnd(_msgid, &buf, SIZE, 0);
        if (n < 0)
        {
            cout << "SendMessage error" << endl;
            return false;
        }
        return true;
    }
    bool ReceiveMessage(string *out, long type)
    {
        msg_buffer buf;
        int n = msgrcv(_msgid, &buf, SIZE, type, 0);
        if (n < 0)
        {
            cout << "ReceiveMessage error" << endl;
            return false;
        }
        buf.msg_text[n] = 0;
        *out = buf.msg_text;
        return true;
    }
    void Destroy()
    {
        msgctl(_msgid, IPC_RMID, nullptr);
    }

private:
    int _msgid;
};

class MessageQueueServer : public MessageQueue
{
public:
    MessageQueueServer()
    {
        Create(CREATE_MESSAGEQUEUE);
    }
    ~MessageQueueServer()
    {
        Destroy();
    }
};

class MessageQueueClient : public MessageQueue
{
public:
    MessageQueueClient()
    {
        Create(GET_MESSAGEQUEUE);
    }
};

server.cc

#include "MessageQueue.hpp"
int main()
{
    MessageQueueServer server;
    string message;
    while (message != "quit")
    {
        server.ReceiveMessage(&message, CLIENTTYPE);
        cout << "server receive: " << message << endl;
    }
    return 0;
}

client.cc

#include "MessageQueue.hpp"
int main()
{
    MessageQueueClient client;
    string message;
    while (message != "quit")
    {
        getline(cin, message);
        client.SendMessage(message, CLIENTTYPE);
    }
    return 0;
}

makefile

.PHONY:all
all:client server
client:client.cc
	g++ -o $@ $^ -std=c++20
server:server.cc
	g++ -o $@ $^ -std=c++20
.PHONY:clean
clean:
	rm -f client server

http://www.niftyadmin.cn/n/5860207.html

相关文章

58同城深度学习推理平台:基于Istio的云原生网关实践解析

在当今数字化时代&#xff0c;深度学习技术的快速发展为各行各业带来了革命性的变化。作为国内领先的分类信息网站&#xff0c;58同城一直致力于通过技术创新提升服务质量和用户体验。近期&#xff0c;58同城AI Lab推出了一项重要的技术革新——基于Istio的云原生网关深度学习推…

用DeepSeek来帮助学习three.js加载3D太极模形

画一个平面的太极图是很容易&#xff0c;要实现3D的应该会很难 一、参考3D模形效果 看某网页看到一个效果&#xff0c;像一个3D太极球&#xff0c;觉得挺有趣&#xff0c;挺解压的&#xff0c;想进一步去了解下这是如何实现 效果&#xff1a; 链接地址&#xff1a; http://www.…

python:多重继承、MRO(方法解析顺序)

在 Python 中&#xff0c;当类存在多重继承时&#xff0c;方法的调用顺序由 方法解析顺序&#xff08;Method Resolution Order, MRO&#xff09; 决定。 Python 使用 C3线性化算法 来确定类的继承顺序&#xff08;MRO&#xff09;&#xff0c;其核心规则是&#xff1a; 子类优…

第六届全球数据库大赛:PolarDB TPC-C性能优化挑战赛方案分享(一)--参数调优

文章目录 前言一、基本参数二、提分参数三、关键参数四、遗漏参数总结前言 今年由于阿里承办了全国大学生计算机系统能力大赛PolarDB数据库创新设计赛,本以为数据库大赛会取消,直到十一左右比赛才姗姗来迟,所以赛程较往年缩短了一个月左右,初赛和复赛都只有一个月时间,时…

跟着 Lua 5.1 官方参考文档学习 Lua (5)

文章目录 2.10 – Garbage Collection2.10.1 – Garbage-Collection Metamethods2.10.2 – Weak Tables 2.10 – Garbage Collection Lua performs automatic memory management. This means that you have to worry neither about allocating memory for new objects nor abo…

如何调用 DeepSeek API:详细教程与示例

目录 一、准备工作 二、DeepSeek API 调用步骤 1. 选择 API 端点 2. 构建 API 请求 3. 发送请求并处理响应 三、Python 示例&#xff1a;调用 DeepSeek API 1. 安装依赖 2. 编写代码 3. 运行代码 四、常见问题及解决方法 1. API 调用返回 401 错误 2. API 调用返回…

工程项目管理系统(源码+文档+部署+讲解)

本文将深入解析“工程项目管理系统”的项目&#xff0c;探究其架构、功能以及技术栈&#xff0c;并分享获取完整源码的途径。 系统概述 工程项目管理系统是一个综合性的管理系统&#xff0c;旨在为工程项目的全生命周期提供全面的支持。该系统包括管理端、小程序、信息中心、…

jvm中各个参数的理解

MEMORY - MANAGERS 定义 MEMORY - MANAGERS即内存管理器&#xff0c;它是操作系统或软件系统中负责管理计算机内存资源的组件。从本质上来说&#xff0c;它是一种软件机制&#xff0c;旨在协调计算机系统中内存的分配、使用和回收等操作&#xff0c;确保系统能够高效、稳定地…