Session.Call做了什么
创建RpcId用于查找,创建Rpc任务等待消息返回
public static async ETTask<IResponse> Call(this Session self, IRequest request)
{
int rpcId = ++Session.RpcId;
RpcInfo rpcInfo = new RpcInfo(request);
self.requestCallbacks[rpcId] = rpcInfo;
request.RpcId = rpcId;
self.Send(request);
return await rpcInfo.Tcs;
}
把消息发送到NetServices的队列,等待网络线程处理
NetServices.Instance.SendMessage(self.ServiceId, self.Id, actorId, message);
//压入队列
public void SendMessage(int serviceId, long channelId, long actorId, object message)
{
NetOperator netOperator = new NetOperator() { Op = NetOp.SendMessage, ServiceId = serviceId, ChannelId = channelId, ActorId = actorId, Object = message };
this.netThreadOperators.Enqueue(netOperator);
}
在RunNetThreadOperator取出消息发送
AService service = this.Get(op.ServiceId);
if (service != null)
{
service.Send(op.ChannelId, op.ActorId, op.Object);
}
接收消息后调用读取回调
if (!this.readCallback.TryGetValue(op.ServiceId, out var action))
{
return;
}
action.Invoke(op.ChannelId, op.ActorId, op.Object);
break;
服务器接收到消息后会先创建会话
private static void OnAccept(this NetServerComponent self, long channelId, IPEndPoint ipEndPoint)
{
Session session = self.AddChildWithId<Session, int>(channelId, self.ServiceId);
session.RemoteAddress = ipEndPoint;
if (self.DomainScene().SceneType != SceneType.BenchmarkServer)
{
// 挂上这个组件,5秒就会删除session,所以客户端验证完成要删除这个组件。该组件的作用就是防止外挂一直连接不发消息也不进行权限验证
session.AddComponent<SessionAcceptTimeoutComponent>();
// 客户端连接,2秒检查一次recv消息,10秒没有消息则断开
session.AddComponent<SessionIdleCheckerComponent>();
}
}
服务器读取消息触发NetServerComponentSystem的OnRead,分发消息
private static void OnRead(this NetServerComponent self, long channelId, long actorId, object message)
{
Session session = self.GetChild<Session>(channelId);
if (session == null)
{
return;
}
session.LastRecvTime = TimeHelper.ClientNow();
OpcodeHelper.LogMsg(self.DomainZone(), message);
EventSystem.Instance.Publish(Root.Instance.Scene, new NetServerComponentOnRead() { Session = session, Message = message });
}
//真正分发消息的事件
namespace ET.Server
{
[Event(SceneType.Process)]
public class NetServerComponentOnReadEvent: AEvent<NetServerComponentOnRead>
{
protected override async ETTask Run(Scene scene, NetServerComponentOnRead args)
{
Session session = args.Session;
object message = args.Message;
if (message is IResponse response)
{
session.OnResponse(response);
return;
}
// 根据消息接口判断是不是Actor消息,不同的接口做不同的处理,比如需要转发给Chat Scene,可以做一个IChatMessage接口
switch (message)
{
case IActorLocationRequest actorLocationRequest: // gate session收到actor rpc消息,先向actor 发送rpc请求,再将请求结果返回客户端
{
long unitId = session.GetComponent<SessionPlayerComponent>().PlayerId;
int rpcId = actorLocationRequest.RpcId; // 这里要保存客户端的rpcId
long instanceId = session.InstanceId;
IResponse iResponse = await ActorLocationSenderComponent.Instance.Call(unitId, actorLocationRequest);
iResponse.RpcId = rpcId;
// session可能已经断开了,所以这里需要判断
if (session.InstanceId == instanceId)
{
session.Send(iResponse);
}
break;
}
case IActorLocationMessage actorLocationMessage:
{
long unitId = session.GetComponent<SessionPlayerComponent>().PlayerId;
ActorLocationSenderComponent.Instance.Send(unitId, actorLocationMessage);
break;
}
case IActorRequest actorRequest: // 分发IActorRequest消息,目前没有用到,需要的自己添加
{
break;
}
case IActorMessage actorMessage: // 分发IActorMessage消息,目前没有用到,需要的自己添加
{
break;
}
default:
{
// 非Actor消息
MessageDispatcherComponent.Instance.Handle(session, message);
break;
}
}
}
}
}
客户端读取消息触发NetClientComponentSystem的OnRead,通知原有会话消息返回了
private static void OnRead(this NetClientComponent self, long channelId, long actorId, object message)
{
//找到原来的会话
Session session = self.GetChild<Session>(channelId);
if (session == null)
{
return;
}
session.LastRecvTime = TimeHelper.ClientNow();
OpcodeHelper.LogMsg(self.DomainZone(), message);
//触发NetClientComponentOnRead事件
EventSystem.Instance.Publish(Root.Instance.Scene, new NetClientComponentOnRead() { Session = session, Message = message });
}
public class NetClientComponentOnReadEvent : AEvent<NetClientComponentOnRead>
{
protected override async ETTask Run(Scene scene, NetClientComponentOnRead args)
{
Session session = args.Session;
object message = args.Message;
if (message is IResponse response)
{
session.OnResponse(response);
return;
}
// 普通消息或者是Rpc请求消息
MessageDispatcherComponent.Instance.Handle(session, message);
await ETTask.CompletedTask;
}
}
会话设置请求结果
public static void OnResponse(this Session self, IResponse response)
{
if (!self.requestCallbacks.TryGetValue(response.RpcId, out var action))
{
return;
}
self.requestCallbacks.Remove(response.RpcId);
if (ErrorCore.IsRpcNeedThrowException(response.Error))
{
action.Tcs.SetException(new Exception($"Rpc error, request: {action.Request} response: {response}"));
return;
}
action.Tcs.SetResult(response);
}
No Comments