- 在Python中使用Linux的IPC通信
- 说明
- 目的
- Semaphore C代码封装
- 源代码: linux_ipc_sem.c
- 头文件 linux_ipc_sem.h
- 编译生成库文件
- Semaphore Python API封装
- Python API: linux_ipc_pyapi_sem.py
- linux_ipc_pyapi_sem.py 测试: linux_ipc_pyapi_sem_test.py
- Python进程与Python进程间通信测试
- Python进程与C进程间通信测试
- 结论
Linux下的IPC进程间通信包括有:消息队列,管道,共享内存,信号量,socket等。
在寻找信号量通信的使用库时,仅仅找到了multiprocessing这个库,并没有发现能够直接和Linux C中的Semaphore直接通信的方法,于是自己动手封装了一遍库,使用python调用C动态库的方式实现。
如果有已经编写好的稳定的库,还请留言告知~。
目的由于工作内容原因,需要实现两个进程间通信,并且一个进程是使用C编写的,另一个是Python编写,进程间的通信需要测试下使用共享内存的效率,已经实现的有C进程与C进程间的共享内存传输测试,为了保证操作的唯一性,必须使用信号量对共享内存中的数据进行保护,但是在使用Python实现时,发现没有找到可以和C进程信号量通信的方法,于是采用本办法,将C的信号量实现封装一层,再使用Python调用。
Semaphore C代码封装 源代码: linux_ipc_sem.c
#include "linux_ipc_sem.h"
void* py_sem_open(const char *__name, int __oflag, unsigned int mode, unsigned int value)
{
sem_t* sem_flag = NULL;
sem_flag = sem_open(__name, __oflag, (mode_t)mode, value);
if (NULL == sem_flag)
{
fprintf(stderr, "failed to open semaphore: %sn", __name);
return NULL;
}
return (void*)sem_flag;
}
int py_sem_close(void* sem_flag)
{
int ret_flag = -1;
ret_flag = sem_close((sem_t*)sem_flag);
return ret_flag;
}
int py_sem_unlink(const char *__name)
{
int ret_flag = -1;
ret_flag = sem_unlink(__name);
return ret_flag;
}
int py_sem_wait(void* sem_flag)
{
int ret_flag = -1;
ret_flag = sem_wait((sem_t*)sem_flag);
return ret_flag;
}
int py_sem_timedwait(void* sem_flag, long int secs, long int nsecs)
{
int ret_flag = -1;
struct timespec delay_time = {0};
delay_time.tv_sec = secs;
delay_time.tv_nsec = nsecs;
ret_flag = sem_timedwait((sem_t*)sem_flag, &delay_time);
return ret_flag;
}
int py_sem_trywait(void* sem_flag)
{
int ret_flag = -1;
ret_flag = sem_trywait((sem_t*)sem_flag);
return ret_flag;
}
int py_sem_post(void* sem_flag)
{
int ret_flag = -1;
ret_flag = sem_post((sem_t*)sem_flag);
return ret_flag;
}
int py_sem_getvalue(void* sem_flag)
{
int ret_flag = -1;
int val_addr = -1;
ret_flag = sem_getvalue((sem_t*)sem_flag, &val_addr);
if(ret_flag < 0)
{
fprintf(stderr, "failed to get sem valn");
}
return val_addr;
}
头文件 linux_ipc_sem.h
#ifndef LINUX_IPC_SEM_H_ #define LINUX_IPC_SEM_H_ #include编译生成库文件#include #include #include #include #include #include #include #include #include extern void* py_sem_open(const char *__name, int __oflag, unsigned int mode, unsigned int value); extern int py_sem_close(void* sem_flag); extern int py_sem_unlink(const char *__name); extern int py_sem_wait(void* sem_flag); extern int py_sem_timedwait(void* sem_flag, long int secs, long int nsecs); extern int py_sem_trywait(void* sem_flag); extern int py_sem_post(void* sem_flag); extern int py_sem_getvalue(void* sem_flag); #endif
gcc -c -FPIC -o linux_ipc_sem.o linux_ipc_sem.c gcc -shared -o liblinux_ipc_sem_pyapi.so linux_ipc_sem.o -lpthread
生成的库即为封装后的函数库
Semaphore Python API封装在python中调用C封装的函数库,将这些函数的使用封装成一个类
Python API: linux_ipc_pyapi_sem.pyfrom ctypes import *
import os
import sys
class linux_ipc_semaphore_pyapi:
library = cdll.LoadLibrary("./liblinux_ipc_sem_pyapi.so")
#liblinux_ipc_for_python_share
sem_flag = 0
## sem_name: 信号量名称
## oflag: 模式,参考man sem_open, python使用os库中的变量
## mode: 权限,使用8进制表示,python中使用0o开头表示8进制
## value: 信号量初始值,默认为0
def sem_open(self, sem_name, oflag, mode, value):
self.library.py_sem_open.argtypes = [c_char_p, c_int, c_uint, c_uint]
self.library.py_sem_open.restype = c_void_p
name = create_string_buffer(sem_name.encode('utf-8'))
self.sem_flag = self.library.py_sem_open(name, c_int(oflag), c_uint(mode), c_uint(value))
return self.sem_flag
def sem_close(self, sem_flag):
self.library.py_sem_close.argtypes = [c_void_p,]
self.library.py_sem_close.restype = c_int
flag = self.library.py_sem_close(c_void_p(sem_flag))
return flag
def sem_unlink(self, sem_name):
self.library.py_sem_unlink.argtypes = [c_char_p,]
self.library.py_sem_unlink.restype = c_int
name = create_string_buffer(sem_name.encode('utf-8'))
flag = self.library.py_sem_unlink(name)
return flag
def sem_wait(self, sem_flag):
self.library.py_sem_wait.argtypes = [c_void_p,]
self.library.py_sem_wait.restype = c_int
flag = self.library.py_sem_wait(c_void_p(sem_flag))
return flag
def sem_timedwait(self, sem_flag, time_secs, time_nsecs):
self.library.py_sem_timedwait.argtypes = [c_void_p, c_int64, c_int64]
self.library.py_sem_timedwait.restype = c_int
flag = self.library.py_sem_timedwait(c_void_p(sem_flag), c_int64(time_secs), c_int64(time_nsecs))
return flag
def sem_trywait(self, sem_flag):
self.library.py_sem_trywait.argtypes = [c_void_p,]
self.library.py_sem_trywait.restype = c_int
flag = self.library.py_sem_trywait(c_void_p(sem_flag))
return flag
def sem_post(self, sem_flag):
self.library.py_sem_post.argtypes = [c_void_p,]
self.library.py_sem_post.restype = c_int
flag = self.library.py_sem_post(c_void_p(sem_flag))
return flag
def sem_getvalue(self, sem_flag):
self.library.py_sem_getvalue.argtypes = [c_void_p]
self.library.py_sem_getvalue.restype = c_int
ret_val = self.library.py_sem_getvalue(c_void_p(sem_flag))
if ret_val < 0:
print("failed to get sem val")
return ret_val
linux_ipc_pyapi_sem.py 测试: linux_ipc_pyapi_sem_test.py
对封装的函数进行测试
import os
import time
import sys
from ctypes import *
from linux_ipc_pyapi_sem import *
global_sem_name = "/test1"
global_sem_test_count = 100000
def linux_ipc_sem_pub():
global global_sem_name
global global_sem_test_count
print("pub")
sem_test = linux_ipc_semaphore_pyapi()
sem_flag = sem_test.sem_open(global_sem_name, os.O_CREAT, 0o666, 0)
if sem_flag == 0:
print("failed to create sem: ", global_sem_name)
return -1
for i in range(0, global_sem_test_count):
flag = sem_test.sem_post(sem_flag)
if flag < 0:
print("sem post failed")
sem_test.sem_close(sem_flag)
sem_test.sem_unlink(global_sem_name)
return -1
time.sleep(0.030)
sem_test.sem_close(sem_flag)
sem_test.sem_unlink(global_sem_name)
return 0
def linux_ipc_sem_sum():
global global_sem_name
global global_sem_test_count
print("sum")
sem_val = 10
sem_test = linux_ipc_semaphore_pyapi()
sem_flag = sem_test.sem_open(global_sem_name, os.O_CREAT, 0o666, 0)
if sem_flag == 0:
print("failed to create sem: ", global_sem_name)
return -1
for i in range(0, global_sem_test_count):
flag = sem_test.sem_wait(sem_flag)
if flag < 0:
print("sem wait failed")
sem_test.sem_close(sem_flag)
sem_test.sem_unlink(global_sem_name)
return -1
flag = sem_test.sem_getvalue(sem_flag)
print("[",i,"],sem val",flag)
if flag < 0:
print("sem getval failed")
sem_test.sem_close(sem_flag)
sem_test.sem_unlink(global_sem_name)
return -1
sem_test.sem_close(sem_flag)
sem_test.sem_unlink(global_sem_name)
return 0
if __name__ == "__main__":
selec_flag = int(sys.argv[1])
print(selec_flag)
if selec_flag == 1:
linux_ipc_sem_pub()
else:
linux_ipc_sem_sum()
Python进程与Python进程间通信测试
测试脚本:
# 接收端 python3 linux_ipc_pyapi_sem_test.py 2 # 发送端 python3 linux_ipc_pyapi_sem_test.py 1
测试运行无误,基本可以使用
Python进程与C进程间通信测试C进程测试代码:
#includestatic char *sem_name = "/test1"; const int test_count = 100000; int32_t linux_ipc_sem_pub() { sem_t *flag = NULL; flag = py_sem_open(sem_name, O_CREAT, 0666, 0); int i = 0; for(i = 0; i < test_count; i++) { py_sem_post(flag); usleep(30000); } if (NULL == flag) { fprintf(stderr, "failed to open semaphore: %sn", strerror(errno)); exit(EXIT_FAILURE); } py_sem_close(flag); py_sem_unlink(sem_name); return 0; } int32_t linux_ipc_sem_sum() { int32_t i = 0; int32_t sem_val = 0; sem_t *flag = NULL; flag = py_sem_open(sem_name,O_CREAT, 0666, 0); if (NULL == flag) { fprintf(stderr, "failed to open semaphore: %sn", strerror(errno)); exit(EXIT_FAILURE); } for (i = 0; i < test_count; i++) { py_sem_wait(flag); sem_val = py_sem_getvalue(flag); fprintf(stdout, "[%3d]semaphore value : %dn", i, sem_val); } return 0; } int main(int argc, char **argv) { if (argc <= 1) { fprintf(stderr, "please input argvn"); exit(EXIT_FAILURE); } if (1 == atoi(argv[1])) { fprintf(stdout, "publisher !n"); linux_ipc_sem_pub(); } else if(2 == atoi(argv[1])) { fprintf(stdout, "consumer !n"); linux_ipc_sem_sum(); } return 0; }
编译生成运行程序:
# 编译生成C进程 gcc -o linux_ipc_sem_test -llinux_ipc_sem_pyapi # 运行python api测试进程,接收端 python3 linux_ipc_pyapi_sem_test.py 2 # 运行C测试进程,发送端 ./linux_ipc_sem_test 1结论
初步测试,该方法可以实现使用python进程与C进程的信号量通信
如果有问题,请在评论区指正。



