![]() |
Shadowrun: Awakened 29 September 2011 - Build 871
|
00001 #include "MessagePump.h" 00002 00003 #include "ClientLoginMessage.h" 00004 #include "IncommingConnectionMessage.h" 00005 #include "GetClientCharacterListMessage.h" 00006 #include "CreateCharacterMessage.h" 00007 #include "JoinChannelMessage.h" 00008 00009 namespace SraNetwork 00010 { 00011 // The idea is ok, but we need to properly delete all classes 00012 // when destroying the msg pump so we need to store the instances. 00013 // Processor function definition: 00014 // typedef void (SraBaseMessage::*process)(SraPacket*, RakNet::SystemAddress*); 00015 00016 // A map of SraNetwork::MessageIdentifier->BaseMessageImpl 00017 tbb::concurrent_hash_map<int, SraBaseMessage*> MessagePump::messageProcessorTable; 00018 00019 // The current list of tasks. 00020 MessagePump* MessagePump::Instance = new MessagePump(); 00021 00022 bool MessagePump::isRunning = true; 00023 00024 tbb::concurrent_queue<MessageData> MessagePump::messageList; 00025 00026 // Creates a new message pump and initalizes the processor tasks. 00027 MessagePump::MessagePump(void) 00028 { 00029 // Add all message processors 00030 // TODO: We should be able to to this in a nicer way: 00031 tbb::concurrent_hash_map<int, SraBaseMessage*>::accessor acc; 00032 messageProcessorTable.insert(acc, (int)ID_LOGIN_REQ); 00033 acc->second = new ClientLoginMessage(); 00034 00035 messageProcessorTable.insert(acc, (int)ID_NEW_INCOMING_CONNECTION); 00036 acc->second = new IncommingConnectionMessage(); 00037 00038 messageProcessorTable.insert(acc, (int)ID_GET_CLIENT_CHARS); 00039 acc->second = new GetClientCharacterListMessage(); 00040 00041 messageProcessorTable.insert(acc, (int)ID_CREATE_CHAR); 00042 acc->second = new CreateCharacterMessage(); 00043 00044 messageProcessorTable.insert(acc, (int)ID_CHAT_CHANNEL_REGISTER); 00045 acc->second = new JoinChannelMessage(); 00046 } 00047 00048 // Destroys all data. 00049 MessagePump::~MessagePump(void) 00050 { 00051 00052 } 00053 00054 // Called from the tbb tasks and adds a processor to the queue. 00055 void MessagePump::AddMessage(RakNet::Packet* packet) 00056 { 00057 // Create message data from packet and address 00058 MessageData nMsg; 00059 nMsg.address = packet->systemAddress;; 00060 00061 // Make a copy of the data because we will dispose 00062 // this package as soon as we will return from this 00063 // call ! 00064 nMsg.stream = RakNet::BitStream( packet->data, packet->length, true); 00065 00066 messageList.push(nMsg ); 00067 00068 isRunning = true; 00069 } 00070 00071 // Stops all processing 00072 void MessagePump::Shutdown() 00073 { 00074 isRunning = false; 00075 } 00076 00077 // Run and process messages. 00078 void MessagePump::Run() 00079 { 00080 // This is of course a potential bottle-neck here, because we might have 00081 // a large number of messages in the queue, so what we want to do is: 00082 // Create a number of concurrent threads that read the messages from the 00083 // pump and process the data. 00084 while (isRunning) 00085 { 00086 MessageData message; 00087 if (messageList.try_pop(message)) 00088 { 00089 // First de-serialize the base package 00090 // this will give us the op-code 00091 // We could simply do a stream.read<unsigned char> 00092 // but this way we could store more base data 00093 // later on. 00094 SraPacket p; 00095 p.Deserialize(&message.stream); 00096 00097 int opCode = p.opCode; 00098 00099 std::cout << "Processing message " << opCode << std::endl; 00100 tbb::concurrent_hash_map<int, SraBaseMessage*>::accessor acc; 00101 if (messageProcessorTable.find(acc, opCode)) 00102 { 00103 acc->second->process( &message ); 00104 } 00105 // We might want to add an else case to send "unknown opcode" messages 00106 // back to the client, but do we really want to ? 00107 } else 00108 { 00109 Sleep(10); 00110 } 00111 } 00112 00113 // } else if (packet.opcode == ID_CONNECTION_LOST || packet.opcode == ID_DISCONNECTION_NOTIFICATION) 00114 // { 00115 // printf("Connection lost, but maybe the client is just changing the zone, so do not remove client\n"); 00116 // //TODO: We do need some sort of keepAlive mechanism to detect dead clients.... 00117 // /* 00118 // ClientRegister* reg = ClientRegister::getInstance(); 00119 // reg->removeClient( address ); 00120 // */ 00121 // } else if (packet.opcode == ID_CONNECT_TO_SRV) 00122 // { 00123 // //User tries to connect to 'his' server, we won't, however, let 00124 // //the client decide which zone, because we can't trust him. 00125 // //This implies that a zone-port event needs to be sent from the udk 00126 // //server instance if a player is moving from zoneA to zoneB. 00127 // ClientRegister* reg = ClientRegister::getInstance(); 00128 // const SraClientData *cdat = reg->getClientFromAddress( address ); 00129 // //cdat will be NULL if the client has not logged in yet 00130 // if (cdat != NULL) { 00131 // RakNet::RakPeerInterface *zoneI = WorldServer::getZoneInterface(); 00132 // if ( zoneI != NULL && zoneI->NumberOfConnections() > 0 ) 00133 // { 00134 // printf("Client logged in and tries to connect to zone %d\n",cdat->zoneID); 00135 // SraZonePacket p; 00136 // ZeroMemory(&p, sizeof(p)); 00137 // p.clientID = cdat->clientID; 00138 // p.fromZoneID = -1; 00139 // p.toZoneID = cdat->zoneID; 00140 // zoneI->Send( (char*)&p, sizeof(p), MEDIUM_PRIORITY, RELIABLE_ORDERED, 0, RakNet::UNASSIGNED_SYSTEM_ADDRESS, true); 00141 // //We are done now, because the main zone thread will receive the package 00142 // } else 00143 // { 00144 // printf("No zone server connected, sending error msg to client\n"); 00145 00146 // } 00147 // } 00148 // } else if (packet.opcode == ID_CHAT_MSG_SENT) 00149 // { 00150 // printf("Chat message\n"); 00151 // } else if (packet.opcode == ID_CHAT_MSG_REQUEST_PENDING) 00152 // { 00153 00154 // } 00155 // else { 00156 // printf("Unknown opcode %d \n", packet.data[0]); 00157 // } 00158 00159 } 00160 }
Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.