Beggining of a actually working steam networking sockets implementation.

This commit is contained in:
Mr_Goldberg 2019-07-15 20:00:52 -04:00
parent e252f83e8a
commit 66932785c3
No known key found for this signature in database
GPG key ID: 8597D87419DEF278
2 changed files with 327 additions and 20 deletions

View file

@ -112,8 +112,7 @@ message Network_Old {
message Networking_Sockets {
enum Types {
CONNECTION_REQUEST_IP = 0;
CONNECTION_REQUEST_STEAMID = 1;
CONNECTION_REQUEST = 0;
CONNECTION_ACCEPTED = 2;
CONNECTION_END = 3;
DATA = 4;

View file

@ -21,11 +21,30 @@ struct Listen_Socket {
HSteamListenSocket socket_id;
int virtual_port;
uint32 ip;
uint16 port;
};
enum connect_socket_status {
CONNECT_SOCKET_NO_CONNECTION,
CONNECT_SOCKET_CONNECTING,
CONNECT_SOCKET_NOT_ACCEPTED,
CONNECT_SOCKET_CONNECTED,
CONNECT_SOCKET_CLOSED,
CONNECT_SOCKET_TIMEDOUT
};
struct Connect_Socket {
int virtual_port;
SteamNetworkingIdentity remote_identity;
HSteamNetConnection remote_id;
HSteamListenSocket listen_socket_id;
enum connect_socket_status status;
int64 user_data;
std::queue<std::string> data;
};
class Steam_Networking_Sockets :
public ISteamNetworkingSockets001,
@ -38,6 +57,7 @@ public ISteamNetworkingSockets
class RunEveryRunCB *run_every_runcb;
std::vector<struct Listen_Socket> listen_sockets;
std::map<HSteamNetConnection, struct Connect_Socket> connect_sockets;
public:
static void steam_callback(void *object, Common_Message *msg)
{
@ -75,15 +95,18 @@ Steam_Networking_Sockets(class Settings *settings, class Networking *network, cl
}
HSteamListenSocket new_listen_socket(int nSteamConnectVirtualPort, uint32 nIP, uint16 nPort)
HSteamListenSocket new_listen_socket(int nSteamConnectVirtualPort)
{
static HSteamListenSocket socket_id;
++socket_id;
if (socket_id == k_HSteamListenSocket_Invalid) ++socket_id;
auto conn = std::find_if(listen_sockets.begin(), listen_sockets.end(), [&nSteamConnectVirtualPort](struct Listen_Socket const& conn) { return conn.virtual_port == nSteamConnectVirtualPort;});
if (conn != listen_sockets.end()) return k_HSteamListenSocket_Invalid;
struct Listen_Socket listen_socket;
listen_socket.socket_id = socket_id;
listen_socket.virtual_port = nSteamConnectVirtualPort;
listen_socket.ip = nIP;
listen_socket.port = nPort;
listen_sockets.push_back(listen_socket);
return socket_id;
}
@ -95,6 +118,80 @@ struct Listen_Socket *get_connection_socket(HSteamListenSocket id)
return &(*conn);
}
bool send_packet_new_connection(HSteamNetConnection m_hConn)
{
auto connect_socket = connect_sockets.find(m_hConn);
if (connect_socket == connect_sockets.end()) return false;
//TODO: right now this only supports connecting with steam id, might need to make ip/port connections work in the future when I find a game that uses them.
Common_Message msg;
msg.set_source_id(settings->get_local_steam_id().ConvertToUint64());
msg.set_dest_id(connect_socket->second.remote_identity.GetSteamID64());
msg.set_allocated_networking_sockets(new Networking_Sockets);
if (connect_socket->second.status == CONNECT_SOCKET_CONNECTING) {
msg.mutable_networking_sockets()->set_type(Networking_Sockets::CONNECTION_REQUEST);
} else if (connect_socket->second.status == CONNECT_SOCKET_CONNECTED) {
msg.mutable_networking_sockets()->set_type(Networking_Sockets::CONNECTION_ACCEPTED);
}
msg.mutable_networking_sockets()->set_port(connect_socket->second.virtual_port);
msg.mutable_networking_sockets()->set_connection_id_from(connect_socket->first);
msg.mutable_networking_sockets()->set_connection_id(connect_socket->second.remote_id);
return network->sendTo(&msg, true);
}
HSteamNetConnection new_connect_socket(SteamNetworkingIdentity remote_identity, int virtual_port, enum connect_socket_status status=CONNECT_SOCKET_CONNECTING, HSteamListenSocket listen_socket_id=k_HSteamListenSocket_Invalid, HSteamNetConnection remote_id=k_HSteamNetConnection_Invalid)
{
Connect_Socket socket = {};
socket.remote_identity = remote_identity;
socket.virtual_port = virtual_port;
socket.listen_socket_id = listen_socket_id;
socket.remote_id = remote_id;
socket.status = status;
socket.user_data = -1;
static HSteamNetConnection socket_id;
++socket_id;
if (socket_id == k_HSteamNetConnection_Invalid) ++socket_id;
if (connect_sockets.insert(std::make_pair(socket_id, socket)).second == false) {
return k_HSteamNetConnection_Invalid;
}
return socket_id;
}
ESteamNetworkingConnectionState convert_status(enum connect_socket_status old_status)
{
if (old_status == CONNECT_SOCKET_NO_CONNECTION) return k_ESteamNetworkingConnectionState_None;
if (old_status == CONNECT_SOCKET_CONNECTING) return k_ESteamNetworkingConnectionState_Connecting;
if (old_status == CONNECT_SOCKET_NOT_ACCEPTED) return k_ESteamNetworkingConnectionState_Connecting;
if (old_status == CONNECT_SOCKET_CONNECTED) return k_ESteamNetworkingConnectionState_Connected;
if (old_status == CONNECT_SOCKET_CLOSED) return k_ESteamNetworkingConnectionState_ClosedByPeer;
if (old_status == CONNECT_SOCKET_TIMEDOUT) return k_ESteamNetworkingConnectionState_ProblemDetectedLocally;
return k_ESteamNetworkingConnectionState_None;
}
void launch_callback(HSteamNetConnection m_hConn, enum connect_socket_status old_status)
{
auto connect_socket = connect_sockets.find(m_hConn);
if (connect_socket == connect_sockets.end()) return;
struct SteamNetConnectionStatusChangedCallback_t data = {};
data.m_hConn = connect_socket->first;
data.m_info.m_identityRemote = connect_socket->second.remote_identity;
data.m_info.m_hListenSocket = connect_socket->second.listen_socket_id;
data.m_info.m_nUserData = connect_socket->second.user_data;
//TODO
//m_addrRemote
//m_eEndReason
data.m_info.m_eState = convert_status(connect_socket->second.status);
data.m_eOldState = convert_status(old_status);
callbacks->addCBResult(data.k_iCallback, &data, sizeof(data));
}
/// Creates a "server" socket that listens for clients to connect to, either by calling
/// ConnectSocketBySteamID or ConnectSocketByIPv4Address.
///
@ -122,7 +219,7 @@ HSteamListenSocket CreateListenSocket( int nSteamConnectVirtualPort, uint32 nIP,
{
PRINT_DEBUG("Steam_Networking_Sockets::CreateListenSocket %i %u %u\n", nSteamConnectVirtualPort, nIP, nPort);
std::lock_guard<std::recursive_mutex> lock(global_mutex);
return new_listen_socket(nSteamConnectVirtualPort, nIP, nPort);
return new_listen_socket(nSteamConnectVirtualPort);
}
/// Creates a "server" socket that listens for clients to connect to by
@ -179,7 +276,9 @@ HSteamNetConnection ConnectByIPAddress( const SteamNetworkingIPAddr &address )
/// when your app initializes
HSteamListenSocket CreateListenSocketP2P( int nVirtualPort )
{
PRINT_DEBUG("Steam_Networking_Sockets::CreateListenSocketP2P\n");
PRINT_DEBUG("Steam_Networking_Sockets::CreateListenSocketP2P %i\n", nVirtualPort);
std::lock_guard<std::recursive_mutex> lock(global_mutex);
return new_listen_socket(nVirtualPort);
}
/// Begin connecting to a server that is identified using a platform-specific identifier.
@ -195,7 +294,24 @@ HSteamListenSocket CreateListenSocketP2P( int nVirtualPort )
/// when your app initializes
HSteamNetConnection ConnectP2P( const SteamNetworkingIdentity &identityRemote, int nVirtualPort )
{
PRINT_DEBUG("Steam_Networking_Sockets::ConnectP2P\n");
PRINT_DEBUG("Steam_Networking_Sockets::ConnectP2P %u\n", nVirtualPort);
std::lock_guard<std::recursive_mutex> lock(global_mutex);
const SteamNetworkingIPAddr *ip = identityRemote.GetIPAddr();
if (identityRemote.m_eType == k_ESteamNetworkingIdentityType_SteamID) {
PRINT_DEBUG("Steam_Networking_Sockets::ConnectP2P %llu\n", identityRemote.GetSteamID64());
//steam id identity
} else if (ip) {
PRINT_DEBUG("Steam_Networking_Sockets::ConnectP2P %u:%u ipv4? %u\n", ip->GetIPv4(), ip->m_port, ip->IsIPv4());
//ip addr
} else {
return k_HSteamNetConnection_Invalid;
}
HSteamNetConnection socket = new_connect_socket(identityRemote, nVirtualPort);
send_packet_new_connection(socket);
return socket;
}
/// Creates a connection and begins talking to a remote destination. The remote host
@ -254,7 +370,16 @@ HSteamNetConnection ConnectByIPv4Address( uint32 nIP, uint16 nPort )
/// notification being posted to the queue and when it is received by the application.)
EResult AcceptConnection( HSteamNetConnection hConn )
{
PRINT_DEBUG("Steam_Networking_Sockets::AcceptConnection\n");
PRINT_DEBUG("Steam_Networking_Sockets::AcceptConnection %u\n", hConn);
std::lock_guard<std::recursive_mutex> lock(global_mutex);
auto connect_socket = connect_sockets.find(hConn);
if (connect_socket == connect_sockets.end()) return k_EResultInvalidParam;
if (connect_socket->second.status != CONNECT_SOCKET_NOT_ACCEPTED) return k_EResultInvalidState;
connect_socket->second.status = CONNECT_SOCKET_CONNECTED;
send_packet_new_connection(connect_socket->first);
return k_EResultOK;
}
@ -281,7 +406,27 @@ EResult AcceptConnection( HSteamNetConnection hConn )
/// ignored.
bool CloseConnection( HSteamNetConnection hPeer, int nReason, const char *pszDebug, bool bEnableLinger )
{
PRINT_DEBUG("Steam_Networking_Sockets::CloseConnection\n");
PRINT_DEBUG("Steam_Networking_Sockets::CloseConnection %u\n", hPeer);
std::lock_guard<std::recursive_mutex> lock(global_mutex);
auto connect_socket = connect_sockets.find(hPeer);
if (connect_socket == connect_sockets.end()) return false;
if (connect_socket->second.status != CONNECT_SOCKET_CLOSED && connect_socket->second.status != CONNECT_SOCKET_TIMEDOUT) {
//TODO send/nReason and pszDebug
Common_Message msg;
msg.set_source_id(settings->get_local_steam_id().ConvertToUint64());
msg.set_dest_id(connect_socket->second.remote_identity.GetSteamID64());
msg.set_allocated_networking_sockets(new Networking_Sockets);
msg.mutable_networking_sockets()->set_type(Networking_Sockets::CONNECTION_END);
msg.mutable_networking_sockets()->set_port(connect_socket->second.virtual_port);
msg.mutable_networking_sockets()->set_connection_id_from(connect_socket->first);
msg.mutable_networking_sockets()->set_connection_id(connect_socket->second.remote_id);
network->sendTo(&msg, true);
}
connect_sockets.erase(connect_socket);
return true;
}
@ -304,12 +449,40 @@ bool CloseListenSocket( HSteamListenSocket hSocket, const char *pszNotifyRemoteR
bool CloseListenSocket( HSteamListenSocket hSocket )
{
PRINT_DEBUG("Steam_Networking_Sockets::CloseListenSocket\n");
std::lock_guard<std::recursive_mutex> lock(global_mutex);
auto conn = std::find_if(listen_sockets.begin(), listen_sockets.end(), [&hSocket](struct Listen_Socket const& conn) { return conn.socket_id == hSocket;});
if (conn == listen_sockets.end()) return false;
std::queue<HSteamNetConnection> to_close;
auto socket_conn = std::begin(connect_sockets);
while (socket_conn != std::end(connect_sockets)) {
if (socket_conn->second.listen_socket_id == hSocket) {
to_close.push(socket_conn->first);
}
++socket_conn;
}
while (to_close.size()) {
CloseConnection(to_close.front(), 0, "", false);
to_close.pop();
}
listen_sockets.erase(conn);
return true;
}
/// Set connection user data. Returns false if the handle is invalid.
bool SetConnectionUserData( HSteamNetConnection hPeer, int64 nUserData )
{
PRINT_DEBUG("Steam_Networking_Sockets::SetConnectionUserData\n");
std::lock_guard<std::recursive_mutex> lock(global_mutex);
auto connect_socket = connect_sockets.find(hPeer);
if (connect_socket == connect_sockets.end()) return false;
connect_socket->second.user_data = nUserData;
return true;
}
@ -318,6 +491,10 @@ bool SetConnectionUserData( HSteamNetConnection hPeer, int64 nUserData )
int64 GetConnectionUserData( HSteamNetConnection hPeer )
{
PRINT_DEBUG("Steam_Networking_Sockets::GetConnectionUserData\n");
std::lock_guard<std::recursive_mutex> lock(global_mutex);
auto connect_socket = connect_sockets.find(hPeer);
if (connect_socket == connect_sockets.end()) return -1;
return connect_socket->second.user_data;
}
@ -400,7 +577,29 @@ EResult SendMessageToConnection( HSteamNetConnection hConn, const void *pData, u
/// (See k_ESteamNetworkingConfig_SendBufferSize)
virtual EResult SendMessageToConnection( HSteamNetConnection hConn, const void *pData, uint32 cbData, int nSendFlags )
{
PRINT_DEBUG("Steam_Networking_Sockets::SendMessageToConnection\n");
PRINT_DEBUG("Steam_Networking_Sockets::SendMessageToConnection %u, len %u, flags %i\n", hConn, cbData, nSendFlags);
std::lock_guard<std::recursive_mutex> lock(global_mutex);
auto connect_socket = connect_sockets.find(hConn);
if (connect_socket == connect_sockets.end()) return k_EResultInvalidParam;
if (connect_socket->second.status == CONNECT_SOCKET_CLOSED) return k_EResultNoConnection;
if (connect_socket->second.status == CONNECT_SOCKET_TIMEDOUT) return k_EResultNoConnection;
if (connect_socket->second.status != CONNECT_SOCKET_CONNECTED) return k_EResultInvalidState;
Common_Message msg;
msg.set_source_id(settings->get_local_steam_id().ConvertToUint64());
msg.set_dest_id(connect_socket->second.remote_identity.GetSteamID64());
msg.set_allocated_networking_sockets(new Networking_Sockets);
msg.mutable_networking_sockets()->set_type(Networking_Sockets::DATA);
msg.mutable_networking_sockets()->set_port(connect_socket->second.virtual_port);
msg.mutable_networking_sockets()->set_connection_id_from(connect_socket->first);
msg.mutable_networking_sockets()->set_connection_id(connect_socket->second.remote_id);
msg.mutable_networking_sockets()->set_data(pData, cbData);
bool reliable = false;
if (nSendFlags & k_nSteamNetworkingSend_Reliable) reliable = true;
if (network->sendTo(&msg, reliable)) return k_EResultOK;
return k_EResultFail;
}
/// If Nagle is enabled (its on by default) then when calling
@ -414,6 +613,41 @@ EResult FlushMessagesOnConnection( HSteamNetConnection hConn )
PRINT_DEBUG("Steam_Networking_Sockets::FlushMessagesOnConnection\n");
}
static void free_steam_message_data(SteamNetworkingMessage_t *pMsg)
{
free(pMsg->m_pData);
pMsg->m_pData = NULL;
}
static void delete_steam_message(SteamNetworkingMessage_t *pMsg)
{
if (pMsg->m_pfnFreeData) pMsg->m_pfnFreeData(pMsg);
delete pMsg;
}
SteamNetworkingMessage_t *get_steam_message_connection(HSteamNetConnection hConn)
{
auto connect_socket = connect_sockets.find(hConn);
if (connect_socket == connect_sockets.end()) return NULL;
if (connect_socket->second.data.empty()) return NULL;
SteamNetworkingMessage_t *pMsg = new SteamNetworkingMessage_t();
unsigned long size = connect_socket->second.data.front().size();
pMsg->m_pData = malloc(size);
pMsg->m_cbSize = size;
memcpy(pMsg->m_pData, connect_socket->second.data.front().data(), size);
pMsg->m_conn = hConn;
pMsg->m_sender = connect_socket->second.remote_identity;
pMsg->m_nConnUserData = connect_socket->second.user_data;
//TODO
//pMsg->m_usecTimeReceived =
//pMsg->m_nMessageNumber =
pMsg->m_pfnFreeData = &free_steam_message_data;
pMsg->m_pfnRelease = &delete_steam_message;
pMsg->m_nChannel = 0;
connect_socket->second.data.pop();
return pMsg;
}
/// Fetch the next available message(s) from the connection, if any.
/// Returns the number of messages returned into your array, up to nMaxMessages.
/// If the connection handle is invalid, -1 is returned.
@ -431,7 +665,18 @@ EResult FlushMessagesOnConnection( HSteamNetConnection hConn )
/// a little while (put it into some queue, etc), and you may call Release() from any thread.
int ReceiveMessagesOnConnection( HSteamNetConnection hConn, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages )
{
PRINT_DEBUG("Steam_Networking_Sockets::ReceiveMessagesOnConnection\n");
PRINT_DEBUG("Steam_Networking_Sockets::ReceiveMessagesOnConnection %u %i\n", hConn, nMaxMessages);
if (!ppOutMessages || !nMaxMessages) return 0;
std::lock_guard<std::recursive_mutex> lock(global_mutex);
SteamNetworkingMessage_t *msg = NULL;
int messages = 0;
while ((msg = get_steam_message_connection(hConn)) && messages < nMaxMessages) {
ppOutMessages[messages] = msg;
++messages;
}
return messages;
}
/// Same as ReceiveMessagesOnConnection, but will return the next message available
@ -444,7 +689,26 @@ int ReceiveMessagesOnConnection( HSteamNetConnection hConn, SteamNetworkingMessa
/// messages is relevant!)
int ReceiveMessagesOnListenSocket( HSteamListenSocket hSocket, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages )
{
PRINT_DEBUG("Steam_Networking_Sockets::ReceiveMessagesOnListenSocket\n");
PRINT_DEBUG("Steam_Networking_Sockets::ReceiveMessagesOnListenSocket %u %i\n", hSocket, nMaxMessages);
if (!ppOutMessages || !nMaxMessages) return 0;
std::lock_guard<std::recursive_mutex> lock(global_mutex);
SteamNetworkingMessage_t *msg = NULL;
int messages = 0;
auto socket_conn = std::begin(connect_sockets);
while (socket_conn != std::end(connect_sockets) && messages < nMaxMessages) {
if (socket_conn->second.listen_socket_id == hSocket) {
while ((msg = get_steam_message_connection(socket_conn->first)) && messages < nMaxMessages) {
ppOutMessages[messages] = msg;
++messages;
}
}
++socket_conn;
}
return messages;
}
/// Returns basic information about the high-level state of the connection.
@ -539,8 +803,8 @@ bool GetListenSocketInfo( HSteamListenSocket hSocket, uint32 *pnIP, uint16 *pnPo
std::lock_guard<std::recursive_mutex> lock(global_mutex);
struct Listen_Socket *socket = get_connection_socket(hSocket);
if (!socket) return false;
if (pnIP) *pnIP = socket->ip;
if (pnPort) *pnPort = socket->port;
if (pnIP) *pnIP = 0;//socket->ip;
if (pnPort) *pnPort = 0;//socket->port;
return true;
}
@ -725,7 +989,7 @@ HSteamListenSocket CreateHostedDedicatedServerListenSocket( int nVirtualPort )
{
PRINT_DEBUG("Steam_Networking_Sockets::CreateHostedDedicatedServerListenSocket %i\n", nVirtualPort);
std::lock_guard<std::recursive_mutex> lock(global_mutex);
return new_listen_socket(nVirtualPort, 0, 0);
return new_listen_socket(nVirtualPort);
}
@ -815,6 +1079,7 @@ void RunCallbacks( ISteamNetworkingSocketsCallbacks *pCallbacks )
void RunCallbacks()
{
//TODO: timeout unaccepted connections after a few seconds or so
}
void Callback(Common_Message *msg)
@ -825,12 +1090,55 @@ void Callback(Common_Message *msg)
}
if (msg->low_level().type() == Low_Level::DISCONNECT) {
for (auto & connect_socket : connect_sockets) {
if (connect_socket.second.remote_identity.GetSteamID64() == msg->source_id()) {
enum connect_socket_status old_status = connect_socket.second.status;
connect_socket.second.status = CONNECT_SOCKET_TIMEDOUT;
launch_callback(connect_socket.first, old_status);
}
}
}
}
if (msg->has_networking_sockets()) {
PRINT_DEBUG("Steam_Networking_Sockets: got network socket msg %u\n", msg->networking_sockets().type());
if (msg->networking_sockets().type() == Networking_Sockets::CONNECTION_REQUEST) {
int virtual_port = msg->networking_sockets().port();
auto conn = std::find_if(listen_sockets.begin(), listen_sockets.end(), [&virtual_port](struct Listen_Socket const& conn) { return conn.virtual_port == virtual_port;});
if (conn != listen_sockets.end()) {
SteamNetworkingIdentity identity;
identity.SetSteamID64(msg->source_id());
HSteamNetConnection new_connection = new_connect_socket(identity, virtual_port, CONNECT_SOCKET_NOT_ACCEPTED, conn->socket_id, msg->networking_sockets().connection_id_from());
launch_callback(new_connection, CONNECT_SOCKET_NO_CONNECTION);
}
} else if (msg->networking_sockets().type() == Networking_Sockets::CONNECTION_ACCEPTED) {
auto connect_socket = connect_sockets.find(msg->networking_sockets().connection_id());
if (connect_socket != connect_sockets.end()) {
if (connect_socket->second.remote_identity.GetSteamID64() == msg->source_id() && connect_socket->second.status == CONNECT_SOCKET_CONNECTING) {
connect_socket->second.remote_id = msg->networking_sockets().connection_id_from();
connect_socket->second.status = CONNECT_SOCKET_CONNECTED;
launch_callback(connect_socket->first, CONNECT_SOCKET_CONNECTING);
}
}
} else if (msg->networking_sockets().type() == Networking_Sockets::DATA) {
auto connect_socket = connect_sockets.find(msg->networking_sockets().connection_id());
if (connect_socket != connect_sockets.end()) {
if (connect_socket->second.remote_identity.GetSteamID64() == msg->source_id() && connect_socket->second.status == CONNECT_SOCKET_CONNECTED) {
connect_socket->second.data.push(msg->networking_sockets().data());
}
}
} else if (msg->networking_sockets().type() == Networking_Sockets::CONNECTION_END) {
auto connect_socket = connect_sockets.find(msg->networking_sockets().connection_id());
if (connect_socket != connect_sockets.end()) {
if (connect_socket->second.remote_identity.GetSteamID64() == msg->source_id() && connect_socket->second.status == CONNECT_SOCKET_CONNECTED) {
enum connect_socket_status old_status = connect_socket->second.status;
connect_socket->second.status = CONNECT_SOCKET_CLOSED;
launch_callback(connect_socket->first, old_status);
}
}
}
}
}