Skip to main content

Session.Call做了什么

创建RpcId用于查找,创建Rpc任务等待消息返回

public static async ETTask<IResponse> Call(this Session self, IRequest request)
{
    int rpcId = ++Session.RpcId;
    RpcInfo rpcInfo = new RpcInfo(request);
    self.requestCallbacks[rpcId] = rpcInfo;
    request.RpcId = rpcId;
    self.Send(request);
    return await rpcInfo.Tcs;
}

把消息发送到NetServices的队列,等待网络线程处理

NetServices.Instance.SendMessage(self.ServiceId, self.Id, actorId, message);

//压入队列
public void SendMessage(int serviceId, long channelId, long actorId, object message)
{
    NetOperator netOperator = new NetOperator() { Op = NetOp.SendMessage, ServiceId = serviceId, ChannelId = channelId, ActorId = actorId, Object = message };
    this.netThreadOperators.Enqueue(netOperator);
}

在RunNetThreadOperator取出消息发送

AService service = this.Get(op.ServiceId);
if (service != null)
{
    service.Send(op.ChannelId, op.ActorId, op.Object);
}

 

接收消息后调用读取回调

if (!this.readCallback.TryGetValue(op.ServiceId, out var action))
{
    return;
}
action.Invoke(op.ChannelId, op.ActorId, op.Object);
break;

服务器接收到消息后会先创建会话

private static void OnAccept(this NetServerComponent self, long channelId, IPEndPoint ipEndPoint)
{
    Session session = self.AddChildWithId<Session, int>(channelId, self.ServiceId);
    session.RemoteAddress = ipEndPoint;

    if (self.DomainScene().SceneType != SceneType.BenchmarkServer)
    {
        // 挂上这个组件,5秒就会删除session,所以客户端验证完成要删除这个组件。该组件的作用就是防止外挂一直连接不发消息也不进行权限验证
        session.AddComponent<SessionAcceptTimeoutComponent>();
        // 客户端连接,2秒检查一次recv消息,10秒没有消息则断开
        session.AddComponent<SessionIdleCheckerComponent>();
    }
}

服务器读取消息触发NetServerComponentSystem的OnRead,分发消息

private static void OnRead(this NetServerComponent self, long channelId, long actorId, object message)
{
    Session session = self.GetChild<Session>(channelId);
    if (session == null)
    {
        return;
    }
    session.LastRecvTime = TimeHelper.ClientNow();

    OpcodeHelper.LogMsg(self.DomainZone(), message);

    EventSystem.Instance.Publish(Root.Instance.Scene, new NetServerComponentOnRead() { Session = session, Message = message });
}


//真正分发消息的事件
namespace ET.Server
{
    [Event(SceneType.Process)]
    public class NetServerComponentOnReadEvent: AEvent<NetServerComponentOnRead>
    {
        protected override async ETTask Run(Scene scene, NetServerComponentOnRead args)
        {
            Session session = args.Session;
            object message = args.Message;

            if (message is IResponse response)
            {
                session.OnResponse(response);
                return;
            }
			
            // 根据消息接口判断是不是Actor消息,不同的接口做不同的处理,比如需要转发给Chat Scene,可以做一个IChatMessage接口
            switch (message)
            {
                case IActorLocationRequest actorLocationRequest: // gate session收到actor rpc消息,先向actor 发送rpc请求,再将请求结果返回客户端
                {
                    long unitId = session.GetComponent<SessionPlayerComponent>().PlayerId;
                    int rpcId = actorLocationRequest.RpcId; // 这里要保存客户端的rpcId
                    long instanceId = session.InstanceId;
                    IResponse iResponse = await ActorLocationSenderComponent.Instance.Call(unitId, actorLocationRequest);
                    iResponse.RpcId = rpcId;
                    // session可能已经断开了,所以这里需要判断
                    if (session.InstanceId == instanceId)
                    {
                        session.Send(iResponse);
                    }
                    break;
                }
                case IActorLocationMessage actorLocationMessage:
                {
                    long unitId = session.GetComponent<SessionPlayerComponent>().PlayerId;
                    ActorLocationSenderComponent.Instance.Send(unitId, actorLocationMessage);
                    break;
                }
                case IActorRequest actorRequest:  // 分发IActorRequest消息,目前没有用到,需要的自己添加
                {
                    break;
                }
                case IActorMessage actorMessage:  // 分发IActorMessage消息,目前没有用到,需要的自己添加
                {
                    break;
                }
				
                default:
                {
                    // 非Actor消息
                    MessageDispatcherComponent.Instance.Handle(session, message);
                    break;
                }
            }
        }
    }
}

 

客户端读取消息触发NetClientComponentSystem的OnRead,通知原有会话消息返回了

private static void OnRead(this NetClientComponent self, long channelId, long actorId, object message)
{
    //找到原来的会话
    Session session = self.GetChild<Session>(channelId);
    if (session == null)
    {
        return;
    }

    session.LastRecvTime = TimeHelper.ClientNow();

    OpcodeHelper.LogMsg(self.DomainZone(), message);

    //触发NetClientComponentOnRead事件
    EventSystem.Instance.Publish(Root.Instance.Scene, new NetClientComponentOnRead() { Session = session, Message = message });
}


public class NetClientComponentOnReadEvent : AEvent<NetClientComponentOnRead>
{
    protected override async ETTask Run(Scene scene, NetClientComponentOnRead args)
    {
        Session session = args.Session;
        object message = args.Message;
        if (message is IResponse response)
        {
            session.OnResponse(response);
            return;
        }

        // 普通消息或者是Rpc请求消息
        MessageDispatcherComponent.Instance.Handle(session, message);
        await ETTask.CompletedTask;
    }
}

会话设置请求结果

public static void OnResponse(this Session self, IResponse response)
{
    if (!self.requestCallbacks.TryGetValue(response.RpcId, out var action))
    {
        return;
    }

    self.requestCallbacks.Remove(response.RpcId);
    if (ErrorCore.IsRpcNeedThrowException(response.Error))
    {
        action.Tcs.SetException(new Exception($"Rpc error, request: {action.Request} response: {response}"));
        return;
    }
    action.Tcs.SetResult(response);
}