想实现一个单机多个进程间互相调用函数的架构,目前的想法是想做成类似rpc的方式。让所有进程都去监听某一个固定的端口成为服务端,并且同时去连接其他的进程的端口成为客户端。进程一通过调用通用接口发送消息给特定的进程二,要求这个调用可以是同步的也可以是异步的。
class Base {
public:
using callback = void (Base::*)(jsonobj)
Base(string name) : m_strName(name) {
InitBusiness();
}
virtual ~Base() {}
// 调用之后就一直处理同异步消息
// block:是否阻塞调用
bool start(bool block = true) {
RecvMsg();
}
/*
同步调用
name: 进程名(应唯一)
function: 方法名
param: 参数(json对象)
result: 调用结果(json对象)
timeout: 调用超时返回
*/
int SyncCall(string name, string function, jsonobj param, jsonobj& result, int timeout = 10);
/*
异步调用
name: 进程名(应唯一)
function: 方法名
param: 参数(json对象)
callback : 回调-处理调用结果
*/
int AsyncCall(string name, string function, jsonobj param, callback cb);
/*
进程初始化,初始化进程自己的业务
*/
protected:
virtual int InitBusiness() = 0;
private:
const std::string m_strName;
UnixDomainSocketWrapper* m_pCommunicator;
// ......
};
想法:
1、所有进程都继承自Base;
2、进程启动之后一直在接收并处理消息;
3、同步调用通过socket发送消息的方式,将消息发送至指定的进程,阻塞等待对方进程收到消息处理完成返回结果,拿到返回结果返回至调用方进程;
4、维护一个任务队列,将所有的异步调用均封装成一个个任务并将之加入队列,等待被处理,处理方式为同步调用的方式,处理结果通过传入的回调通知调用方进程;
5、缺点是每个进程均需要监听一个端口,并且需要connect其他所有进程所监听的端口,造成性能问题;
6、问题是如何处理多条消息同时到达返回结果时应使用唯一消息ID以及如何处理消息丢失、重传等问题?
最终实现形式:
```c++
//进程一
class CTest1 : public Base {
public:
CTest1(string name) : Base(name) {}
virtual ~CTest1() {}
// 对外接口
int Test1(jsonobj, jsonobj&);
protected:
virtual int InitBusiness() {
// 初始化业务
Business1();
Business2();
std::thread(&CTest1::CycleBusiness, this).detach();
}
private:
// 回调
void OnTest2CB(jsonobj param);
// 业务
void Business1() {
jsonobj param, result;
SyncCall("CTest2", "Test2", param, result); // 同步调用进程二的Test2接口
AsyncCall("CTest2", "Test2", param, OnTest2CB); // 异步调用进程二的Test2接口
}
void Business2();
void CycleBusiness();
};
int main() {
Base* obj = new CTest1("CTest1");
obj->start();
delete obj;
return 0;
}
//进程二
class CTest2 : public Base {
public:
CTest2(string name) : Base(name) {}
virtual ~CTest2() {}
// 对外接口
int Test2(jsonobj, jsonobj&);
protected:
virtual int InitBusiness() {
// 初始化业务
Business1();
Business2();
std::thread(&CTest2::CycleBusiness, this).detach();
}
private:
// 回调
void OnTest1CB(jsonobj param);
void Business1() {
jsonobj param, result;
SyncCall("CTest1", "Test1", param, result); // 同步调用进程一的Test1接口
AsyncCall("CTest1", "Test1", param, OnTest1CB); // 异步调用进程一的Test1接口
}
void Business2();
void CycleBusiness();
};
int main() {
Base* obj = new CTest2("CTest2");
obj->start();
delete obj;
return 0;
}
```