栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

Python——Python中使用linux下的IPC通信

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python——Python中使用linux下的IPC通信

在Python中使用Linux的IPC通信
  • 在Python中使用Linux的IPC通信
    • 说明
    • 目的
    • Semaphore C代码封装
      • 源代码: linux_ipc_sem.c
      • 头文件 linux_ipc_sem.h
      • 编译生成库文件
    • Semaphore Python API封装
      • Python API: linux_ipc_pyapi_sem.py
      • linux_ipc_pyapi_sem.py 测试: linux_ipc_pyapi_sem_test.py
      • Python进程与Python进程间通信测试
      • Python进程与C进程间通信测试
    • 结论

在Python中使用Linux的IPC通信 说明

Linux下的IPC进程间通信包括有:消息队列,管道,共享内存,信号量,socket等。

在寻找信号量通信的使用库时,仅仅找到了multiprocessing这个库,并没有发现能够直接和Linux C中的Semaphore直接通信的方法,于是自己动手封装了一遍库,使用python调用C动态库的方式实现。

如果有已经编写好的稳定的库,还请留言告知~。

目的

由于工作内容原因,需要实现两个进程间通信,并且一个进程是使用C编写的,另一个是Python编写,进程间的通信需要测试下使用共享内存的效率,已经实现的有C进程与C进程间的共享内存传输测试,为了保证操作的唯一性,必须使用信号量对共享内存中的数据进行保护,但是在使用Python实现时,发现没有找到可以和C进程信号量通信的方法,于是采用本办法,将C的信号量实现封装一层,再使用Python调用。

Semaphore C代码封装 源代码: linux_ipc_sem.c

#include "linux_ipc_sem.h"


void* py_sem_open(const char *__name, int __oflag, unsigned int mode, unsigned int value)
{
	sem_t* sem_flag = NULL;

	sem_flag = sem_open(__name, __oflag, (mode_t)mode, value);

	if (NULL == sem_flag)
	{
		fprintf(stderr, "failed to open semaphore: %sn", __name);
		return NULL;
	}

	return (void*)sem_flag;
}


int py_sem_close(void* sem_flag)
{
	int ret_flag = -1;

	ret_flag = sem_close((sem_t*)sem_flag);

	return ret_flag;
}


int py_sem_unlink(const char *__name)
{
	int ret_flag = -1;
	ret_flag = sem_unlink(__name);

	return ret_flag;
}


int py_sem_wait(void* sem_flag)
{
	int ret_flag = -1;
	ret_flag = sem_wait((sem_t*)sem_flag);

	return ret_flag;
}


int py_sem_timedwait(void* sem_flag, long int secs, long int nsecs)
{
	int ret_flag = -1;
	struct timespec delay_time = {0};
	delay_time.tv_sec = secs;
	delay_time.tv_nsec = nsecs;

	ret_flag = sem_timedwait((sem_t*)sem_flag, &delay_time);

	return ret_flag;
}


int py_sem_trywait(void* sem_flag)
{
	int ret_flag = -1;

	ret_flag = sem_trywait((sem_t*)sem_flag);

	return ret_flag;
}


int py_sem_post(void* sem_flag)
{
	int ret_flag = -1;

	ret_flag = sem_post((sem_t*)sem_flag);

	return ret_flag;
}


int py_sem_getvalue(void* sem_flag)
{
	int ret_flag = -1;
	int val_addr = -1;

	ret_flag = sem_getvalue((sem_t*)sem_flag, &val_addr);
	if(ret_flag < 0)
	{
		fprintf(stderr, "failed to get sem valn");
	}

	return val_addr;
}
头文件 linux_ipc_sem.h

#ifndef LINUX_IPC_SEM_H_
#define LINUX_IPC_SEM_H_



#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include 	
#include 
#include 



extern void* py_sem_open(const char *__name, int __oflag, unsigned int  mode, unsigned int value);


extern int py_sem_close(void* sem_flag);


extern int py_sem_unlink(const char *__name);


extern int py_sem_wait(void* sem_flag);


extern int py_sem_timedwait(void* sem_flag, long int secs, long int nsecs);


extern int py_sem_trywait(void* sem_flag);


extern int py_sem_post(void* sem_flag);


extern int py_sem_getvalue(void* sem_flag);

#endif 
编译生成库文件
gcc -c -FPIC -o linux_ipc_sem.o linux_ipc_sem.c 
gcc -shared -o liblinux_ipc_sem_pyapi.so linux_ipc_sem.o -lpthread

生成的库即为封装后的函数库

Semaphore Python API封装

在python中调用C封装的函数库,将这些函数的使用封装成一个类

Python API: linux_ipc_pyapi_sem.py
from ctypes import *
import os
import sys

class linux_ipc_semaphore_pyapi:
    library = cdll.LoadLibrary("./liblinux_ipc_sem_pyapi.so")
#liblinux_ipc_for_python_share
    sem_flag = 0

    ## sem_name: 信号量名称
    ## oflag: 模式,参考man sem_open, python使用os库中的变量
    ## mode: 权限,使用8进制表示,python中使用0o开头表示8进制
    ## value: 信号量初始值,默认为0
    def sem_open(self, sem_name, oflag, mode, value):
        self.library.py_sem_open.argtypes = [c_char_p, c_int, c_uint, c_uint]
        self.library.py_sem_open.restype = c_void_p

        name = create_string_buffer(sem_name.encode('utf-8'))
        self.sem_flag = self.library.py_sem_open(name, c_int(oflag), c_uint(mode), c_uint(value))
        return self.sem_flag

    def sem_close(self, sem_flag):
        self.library.py_sem_close.argtypes = [c_void_p,]
        self.library.py_sem_close.restype = c_int
        
        flag = self.library.py_sem_close(c_void_p(sem_flag))
        return flag
    
    def sem_unlink(self, sem_name):
        self.library.py_sem_unlink.argtypes = [c_char_p,]
        self.library.py_sem_unlink.restype = c_int

        name = create_string_buffer(sem_name.encode('utf-8'))
        flag = self.library.py_sem_unlink(name)
        return flag
    
    def sem_wait(self, sem_flag):
        self.library.py_sem_wait.argtypes = [c_void_p,]
        self.library.py_sem_wait.restype = c_int

        flag = self.library.py_sem_wait(c_void_p(sem_flag))
        return flag
    
    def sem_timedwait(self, sem_flag, time_secs, time_nsecs):
        self.library.py_sem_timedwait.argtypes = [c_void_p, c_int64, c_int64]
        self.library.py_sem_timedwait.restype = c_int

        flag = self.library.py_sem_timedwait(c_void_p(sem_flag), c_int64(time_secs), c_int64(time_nsecs))
        return flag
    
    def sem_trywait(self, sem_flag):
        self.library.py_sem_trywait.argtypes = [c_void_p,]
        self.library.py_sem_trywait.restype = c_int

        flag =  self.library.py_sem_trywait(c_void_p(sem_flag))
        return flag
    
    def sem_post(self, sem_flag):
        self.library.py_sem_post.argtypes = [c_void_p,]
        self.library.py_sem_post.restype = c_int
        
        flag = self.library.py_sem_post(c_void_p(sem_flag))

        return flag
    
    def sem_getvalue(self, sem_flag):
        self.library.py_sem_getvalue.argtypes = [c_void_p]
        self.library.py_sem_getvalue.restype = c_int
        
        ret_val  = self.library.py_sem_getvalue(c_void_p(sem_flag))
        if ret_val < 0:
            print("failed to get sem val")
        return ret_val
linux_ipc_pyapi_sem.py 测试: linux_ipc_pyapi_sem_test.py

对封装的函数进行测试

import os
import time
import sys

from ctypes import *
from linux_ipc_pyapi_sem import *

global_sem_name = "/test1"
global_sem_test_count = 100000
def linux_ipc_sem_pub():
    global global_sem_name
    global global_sem_test_count
    print("pub")
    sem_test = linux_ipc_semaphore_pyapi()
    sem_flag = sem_test.sem_open(global_sem_name, os.O_CREAT, 0o666, 0)
    if sem_flag == 0:
        print("failed to create sem: ", global_sem_name)
        return -1

    for i in range(0, global_sem_test_count):
        flag = sem_test.sem_post(sem_flag)
        if flag < 0:
            print("sem post failed")
            sem_test.sem_close(sem_flag)
            sem_test.sem_unlink(global_sem_name)
            return -1
        time.sleep(0.030)
    
    sem_test.sem_close(sem_flag)
    sem_test.sem_unlink(global_sem_name)

    return 0

def linux_ipc_sem_sum():
    global global_sem_name
    global global_sem_test_count
    print("sum")
    sem_val = 10
    sem_test = linux_ipc_semaphore_pyapi()
    sem_flag = sem_test.sem_open(global_sem_name, os.O_CREAT, 0o666, 0)
    if sem_flag == 0:
        print("failed to create sem: ", global_sem_name)
        return -1

    for i in range(0, global_sem_test_count):
        flag = sem_test.sem_wait(sem_flag)
        if flag < 0:
            print("sem wait failed")
            sem_test.sem_close(sem_flag)
            sem_test.sem_unlink(global_sem_name)
            return -1
        flag = sem_test.sem_getvalue(sem_flag)
        print("[",i,"],sem val",flag)
        if flag < 0:
            print("sem getval failed")
            sem_test.sem_close(sem_flag)
            sem_test.sem_unlink(global_sem_name)
            return -1
    
    sem_test.sem_close(sem_flag)
    sem_test.sem_unlink(global_sem_name)

    return 0

if __name__ == "__main__":
    selec_flag = int(sys.argv[1])
    print(selec_flag)
    if selec_flag == 1:
        linux_ipc_sem_pub()
    else:
        linux_ipc_sem_sum()
Python进程与Python进程间通信测试

测试脚本:

# 接收端
python3 linux_ipc_pyapi_sem_test.py 2

# 发送端
python3 linux_ipc_pyapi_sem_test.py 1

测试运行无误,基本可以使用

Python进程与C进程间通信测试

C进程测试代码:


#include 


static char *sem_name = "/test1";
const int test_count = 100000;
int32_t linux_ipc_sem_pub()
{
	sem_t *flag = NULL;

	flag = py_sem_open(sem_name, O_CREAT, 0666, 0);

	int i = 0;
	for(i = 0; i < test_count; i++)
	{
		py_sem_post(flag);
		usleep(30000);
	}
	if (NULL == flag)
	{
		fprintf(stderr, "failed to open semaphore: %sn", strerror(errno));
		exit(EXIT_FAILURE);
	}

	py_sem_close(flag);
	py_sem_unlink(sem_name);

	return 0;
}

int32_t linux_ipc_sem_sum()
{
	int32_t i = 0;
	int32_t sem_val = 0;
	sem_t *flag = NULL;
	flag = py_sem_open(sem_name,O_CREAT, 0666, 0);
	if (NULL == flag)
	{
		fprintf(stderr, "failed to open semaphore: %sn", strerror(errno));
		exit(EXIT_FAILURE);
	}

	for (i = 0; i < test_count; i++)
	{
		py_sem_wait(flag);
		sem_val = py_sem_getvalue(flag);
		fprintf(stdout, "[%3d]semaphore value : %dn", i, sem_val);
	}

	return 0;
}


int main(int argc, char **argv)
{
	

	if (argc <= 1)
	{
		fprintf(stderr, "please input argvn");
		exit(EXIT_FAILURE);
	}

	if (1 == atoi(argv[1]))
	{
		fprintf(stdout, "publisher !n");
		linux_ipc_sem_pub();
	}
	else if(2 == atoi(argv[1]))
	{
		fprintf(stdout, "consumer !n");
		linux_ipc_sem_sum();
	}

	return 0;
}

编译生成运行程序:

# 编译生成C进程
gcc -o linux_ipc_sem_test -llinux_ipc_sem_pyapi

# 运行python api测试进程,接收端
python3 linux_ipc_pyapi_sem_test.py 2

# 运行C测试进程,发送端
./linux_ipc_sem_test 1
结论

初步测试,该方法可以实现使用python进程与C进程的信号量通信

如果有问题,请在评论区指正。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/679639.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号