Shadowrun: Awakened 29 September 2011 - Build 871
MessagePump.cpp
Go to the documentation of this file.
00001 #include "MessagePump.h"
00002 
00003 #include "ClientLoginMessage.h"
00004 #include "IncommingConnectionMessage.h"
00005 #include "GetClientCharacterListMessage.h"
00006 #include "CreateCharacterMessage.h"
00007 #include "JoinChannelMessage.h"
00008 
00009 namespace SraNetwork
00010 {
00011     // The idea is ok, but we need to properly delete all classes
00012     // when destroying the msg pump so we need to store the instances.
00013     // Processor function definition:
00014     // typedef void (SraBaseMessage::*process)(SraPacket*, RakNet::SystemAddress*);
00015     
00016     // A map of SraNetwork::MessageIdentifier->BaseMessageImpl
00017     tbb::concurrent_hash_map<int, SraBaseMessage*>  MessagePump::messageProcessorTable;
00018 
00019     // The current list of tasks.
00020     MessagePump* MessagePump::Instance = new MessagePump();
00021 
00022     bool MessagePump::isRunning = true;
00023 
00024     tbb::concurrent_queue<MessageData> MessagePump::messageList;
00025 
00026     // Creates a new message pump and initalizes the processor tasks.
00027     MessagePump::MessagePump(void)
00028     {
00029         // Add all message processors 
00030         // TODO: We should be able to to this in a nicer way:
00031         tbb::concurrent_hash_map<int, SraBaseMessage*>::accessor acc;
00032         messageProcessorTable.insert(acc, (int)ID_LOGIN_REQ);
00033         acc->second = new ClientLoginMessage();
00034 
00035         messageProcessorTable.insert(acc, (int)ID_NEW_INCOMING_CONNECTION);
00036         acc->second = new IncommingConnectionMessage();
00037 
00038         messageProcessorTable.insert(acc, (int)ID_GET_CLIENT_CHARS);
00039         acc->second = new GetClientCharacterListMessage();
00040         
00041         messageProcessorTable.insert(acc, (int)ID_CREATE_CHAR);
00042         acc->second = new CreateCharacterMessage();
00043 
00044         messageProcessorTable.insert(acc, (int)ID_CHAT_CHANNEL_REGISTER);
00045         acc->second = new JoinChannelMessage();
00046     }
00047 
00048     // Destroys all data.
00049     MessagePump::~MessagePump(void)
00050     {
00051         
00052     }
00053 
00054     // Called from the tbb tasks and adds a processor to the queue.
00055     void MessagePump::AddMessage(RakNet::Packet* packet)
00056     {
00057         // Create message data from packet and address 
00058         MessageData nMsg;
00059         nMsg.address = packet->systemAddress;;
00060         
00061         // Make a copy of the data because we will dispose
00062         // this package as soon as we will return from this 
00063         // call !
00064         nMsg.stream = RakNet::BitStream( packet->data, packet->length, true);
00065 
00066         messageList.push(nMsg );
00067         
00068         isRunning = true;
00069     }
00070 
00071     // Stops all processing
00072     void MessagePump::Shutdown()
00073     {
00074         isRunning = false;
00075     }
00076 
00077     // Run and process messages.
00078     void MessagePump::Run()
00079     {
00080         // This is of course a potential bottle-neck here, because we might have
00081         // a large number of messages in the queue, so what we want to do is:
00082         // Create a number of concurrent threads that read the messages from the
00083         // pump and process the data. 
00084         while (isRunning)
00085         {
00086             MessageData message;
00087             if (messageList.try_pop(message))
00088             {
00089                 // First de-serialize the base package
00090                 // this will give us the op-code
00091                 // We could simply do a stream.read<unsigned char>
00092                 // but this way we could store more base data
00093                 // later on.
00094                 SraPacket p;
00095                 p.Deserialize(&message.stream);
00096 
00097                 int opCode = p.opCode;
00098 
00099                 std::cout << "Processing message " << opCode << std::endl;
00100                 tbb::concurrent_hash_map<int, SraBaseMessage*>::accessor acc;
00101                 if (messageProcessorTable.find(acc, opCode))
00102                 {
00103                     acc->second->process( &message );
00104                 }
00105                 // We might want to add an else case to send "unknown opcode" messages 
00106                 // back to the client, but do we really want to ?
00107             } else
00108             {
00109                 Sleep(10);
00110             }
00111         }
00112 
00113         //  } else if (packet.opcode == ID_CONNECTION_LOST || packet.opcode == ID_DISCONNECTION_NOTIFICATION) 
00114         //  {
00115         //      printf("Connection lost, but maybe the client is just changing the zone, so do not remove client\n");
00116         //      //TODO: We do need some sort of keepAlive mechanism to detect dead clients....
00117         //      /*
00118         //      ClientRegister* reg = ClientRegister::getInstance();
00119         //      reg->removeClient( address );
00120         //      */
00121         //  }  else if (packet.opcode == ID_CONNECT_TO_SRV)
00122         //  {
00123         //      //User tries to connect to 'his' server, we won't, however, let 
00124         //      //the client decide which zone, because we can't trust him.
00125         //      //This implies that a zone-port event needs to be sent from the udk
00126         //      //server instance if a player is moving from zoneA to zoneB. 
00127         //      ClientRegister* reg = ClientRegister::getInstance();
00128         //      const SraClientData *cdat = reg->getClientFromAddress( address );
00129         //      //cdat will be NULL if the client has not logged in yet
00130         //      if (cdat != NULL) {
00131         //          RakNet::RakPeerInterface *zoneI = WorldServer::getZoneInterface();
00132         //          if ( zoneI != NULL && zoneI->NumberOfConnections() > 0 ) 
00133         //          {
00134         //              printf("Client logged in and tries to connect to zone %d\n",cdat->zoneID);
00135         //              SraZonePacket p;
00136         //              ZeroMemory(&p, sizeof(p));
00137         //              p.clientID = cdat->clientID;
00138         //              p.fromZoneID = -1;
00139         //              p.toZoneID = cdat->zoneID;
00140         //              zoneI->Send( (char*)&p, sizeof(p), MEDIUM_PRIORITY, RELIABLE_ORDERED, 0,  RakNet::UNASSIGNED_SYSTEM_ADDRESS, true);
00141         //              //We are done now, because the main zone thread will receive the package
00142         //          } else 
00143         //          {
00144         //              printf("No zone server connected, sending error msg to client\n");
00145 
00146         //          }
00147         //      }
00148         //  }  else if (packet.opcode == ID_CHAT_MSG_SENT) 
00149         //  {
00150         //      printf("Chat message\n");
00151         //  } else if (packet.opcode == ID_CHAT_MSG_REQUEST_PENDING) 
00152         //  {
00153 
00154         //  }
00155         //  else {
00156         //      printf("Unknown opcode %d \n", packet.data[0]);
00157         //  }
00158 
00159     }
00160 }

Copyright © 2007-2010 by The Shadowrun: Awakened Team. This work is licensed under the GNU Lesser General Public License 3.

GNU Lesser General Public License 3 Sourceforge.net