/* * This is the source code of tgnet library v. 1.0 * It is licensed under GNU GPL v. 2 or later. * You should have received a copy of the license in this archive (see LICENSE). * * Copyright Nikolai Kudashov, 2015. */ #include #include #include #include #include #include #include #include #include #include "ConnectionsManager.h" #include "FileLog.h" #include "EventObject.h" #include "MTProtoScheme.h" #include "ApiScheme.h" #include "NativeByteBuffer.h" #include "Connection.h" #include "Datacenter.h" #include "Request.h" #include "BuffersStorage.h" #include "ByteArray.h" #include "Config.h" #ifdef ANDROID #include JavaVM *javaVm = nullptr; JNIEnv *jniEnv = nullptr; jclass jclass_ByteBuffer = nullptr; jmethodID jclass_ByteBuffer_allocateDirect = 0; #endif static bool done = false; ConnectionsManager::ConnectionsManager() { if ((epolFd = epoll_create(128)) == -1) { DEBUG_E("unable to create epoll instance"); exit(1); } int flags; if ((flags = fcntl(epolFd, F_GETFD, NULL)) < 0) { DEBUG_W("fcntl(%d, F_GETFD)", epolFd); } if (!(flags & FD_CLOEXEC)) { if (fcntl(epolFd, F_SETFD, flags | FD_CLOEXEC) == -1) { DEBUG_W("fcntl(%d, F_SETFD)", epolFd); } } if ((epollEvents = new epoll_event[128]) == nullptr) { DEBUG_E("unable to allocate epoll events"); exit(1); } eventFd = eventfd(0, EFD_NONBLOCK); if (eventFd != -1) { struct epoll_event event = {0}; event.data.ptr = new EventObject(&eventFd, EventObjectTypeEvent); event.events = EPOLLIN | EPOLLET; if (epoll_ctl(epolFd, EPOLL_CTL_ADD, eventFd, &event) == -1) { eventFd = -1; FileLog::e("unable to add eventfd"); } } if (eventFd == -1) { pipeFd = new int[2]; if (pipe(pipeFd) != 0) { DEBUG_E("unable to create pipe"); exit(1); } flags = fcntl(pipeFd[0], F_GETFL); if (flags == -1) { DEBUG_E("fcntl get pipefds[0] failed"); exit(1); } if (fcntl(pipeFd[0], F_SETFL, flags | O_NONBLOCK) == -1) { DEBUG_E("fcntl set pipefds[0] failed"); exit(1); } flags = fcntl(pipeFd[1], F_GETFL); if (flags == -1) { DEBUG_E("fcntl get pipefds[1] failed"); exit(1); } if (fcntl(pipeFd[1], F_SETFL, flags | O_NONBLOCK) == -1) { DEBUG_E("fcntl set pipefds[1] failed"); exit(1); } EventObject *eventObject = new EventObject(pipeFd, EventObjectTypePipe); epoll_event eventMask = {}; eventMask.events = EPOLLIN; eventMask.data.ptr = eventObject; if (epoll_ctl(epolFd, EPOLL_CTL_ADD, pipeFd[0], &eventMask) != 0) { DEBUG_E("can't add pipe to epoll"); exit(1); } } networkBuffer = new NativeByteBuffer((uint32_t) READ_BUFFER_SIZE); if (networkBuffer == nullptr) { DEBUG_E("unable to allocate read buffer"); exit(1); } pthread_mutex_init(&mutex, NULL); } ConnectionsManager::~ConnectionsManager() { if (epolFd != 0) { close(epolFd); epolFd = 0; } pthread_mutex_destroy(&mutex); } ConnectionsManager& ConnectionsManager::getInstance() { static ConnectionsManager instance; return instance; } int ConnectionsManager::callEvents(int64_t now) { if (!events.empty()) { for (std::list::iterator iter = events.begin(); iter != events.end();) { EventObject *eventObject = (*iter); if (eventObject->time <= now) { iter = events.erase(iter); eventObject->onEvent(0); } else { int diff = (int) (eventObject->time - now); return diff > 1000 || diff < 0 ? 1000 : diff; } } } if (!networkPaused) { return 1000; } int32_t timeToPushPing = (sendingPushPing ? 30000 : 60000 * 3) - llabs(now - lastPushPingTime); if (timeToPushPing <= 0) { return 1000; } DEBUG_D("schedule next epoll wakeup in %d ms", timeToPushPing); return timeToPushPing; } void ConnectionsManager::checkPendingTasks() { while (true) { std::function task; pthread_mutex_lock(&mutex); if (pendingTasks.empty()) { pthread_mutex_unlock(&mutex); return; } task = pendingTasks.front(); pendingTasks.pop(); pthread_mutex_unlock(&mutex); task(); } } void ConnectionsManager::select() { checkPendingTasks(); int eventsCount = epoll_wait(epolFd, epollEvents, 128, callEvents(getCurrentTimeMonotonicMillis())); checkPendingTasks(); int64_t now = getCurrentTimeMonotonicMillis(); callEvents(now); for (int32_t a = 0; a < eventsCount; a++) { EventObject *eventObject = (EventObject *) epollEvents[a].data.ptr; eventObject->onEvent(epollEvents[a].events); } size_t count = activeConnections.size(); for (uint32_t a = 0; a < count; a++) { activeConnections[a]->checkTimeout(now); } Datacenter *datacenter = getDatacenterWithId(currentDatacenterId); if (pushConnectionEnabled) { if ((sendingPushPing && llabs(now - lastPushPingTime) >= 30000) || llabs(now - lastPushPingTime) >= 60000 * 3 + 10000) { lastPushPingTime = 0; sendingPushPing = false; if (datacenter != nullptr) { Connection *connection = datacenter->getPushConnection(false); if (connection != nullptr) { connection->suspendConnection(); } } DEBUG_D("push ping timeout"); } if (llabs(now - lastPushPingTime) >= 60000 * 3) { DEBUG_D("time for push ping"); lastPushPingTime = now; if (datacenter != nullptr) { sendPing(datacenter, true); } } } if (lastPauseTime != 0 && llabs(now - lastPauseTime) >= nextSleepTimeout) { bool dontSleep = !requestingSaltsForDc.empty(); if (!dontSleep) { for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if (request->connectionType & ConnectionTypeDownload || request->connectionType & ConnectionTypeUpload) { dontSleep = true; break; } } } if (!dontSleep) { for (requestsIter iter = requestsQueue.begin(); iter != requestsQueue.end(); iter++) { Request *request = iter->get(); if (request->connectionType & ConnectionTypeDownload || request->connectionType & ConnectionTypeUpload) { dontSleep = true; break; } } } if (!dontSleep) { if (!networkPaused) { DEBUG_D("pausing network and timers by sleep time = %d", nextSleepTimeout); for (std::map::iterator iter = datacenters.begin(); iter != datacenters.end(); iter++) { iter->second->suspendConnections(); } } networkPaused = true; return; } else { lastPauseTime = now; DEBUG_D("don't sleep because of salt, upload or download request"); } } if (networkPaused) { networkPaused = false; DEBUG_D("resume network and timers"); } if (delegate != nullptr) { delegate->onUpdate(); } if (datacenter != nullptr) { if (datacenter->hasAuthKey()) { if (llabs(now - lastPingTime) >= 19000) { lastPingTime = now; sendPing(datacenter, false); } if (abs((int32_t) (now / 1000) - lastDcUpdateTime) >= DC_UPDATE_TIME) { updateDcSettings(0, false); } processRequestQueue(0, 0); } else if (!datacenter->isHandshaking()) { datacenter->beginHandshake(true); } } } void ConnectionsManager::scheduleTask(std::function task) { pthread_mutex_lock(&mutex); pendingTasks.push(task); pthread_mutex_unlock(&mutex); wakeup(); } void ConnectionsManager::scheduleEvent(EventObject *eventObject, uint32_t time) { eventObject->time = getCurrentTimeMonotonicMillis() + time; std::list::iterator iter; for (iter = events.begin(); iter != events.end(); iter++) { if ((*iter)->time > eventObject->time) { break; } } events.insert(iter, eventObject); } void ConnectionsManager::removeEvent(EventObject *eventObject) { for (std::list::iterator iter = events.begin(); iter != events.end(); iter++) { if (*iter == eventObject) { events.erase(iter); break; } } } void ConnectionsManager::wakeup() { if (pipeFd == nullptr) { eventfd_write(eventFd, 1); } else { char ch = 'x'; write(pipeFd[1], &ch, 1); } } void *ConnectionsManager::ThreadProc(void *data) { DEBUG_D("network thread started"); #ifdef ANDROID javaVm->AttachCurrentThread(&jniEnv, NULL); #endif ConnectionsManager *networkManager = (ConnectionsManager *) (data); if (networkManager->currentUserId != 0 && networkManager->pushConnectionEnabled) { Datacenter *datacenter = networkManager->getDatacenterWithId(networkManager->currentDatacenterId); if (datacenter != nullptr) { datacenter->createPushConnection()->setSessionId(networkManager->pushSessionId); networkManager->sendPing(datacenter, true); } } do { networkManager->select(); } while (!done); return nullptr; } void ConnectionsManager::loadConfig() { if (config == nullptr) { config = new Config("tgnet.dat"); } NativeByteBuffer *buffer = config->readConfig(); if (buffer != nullptr) { uint32_t version = buffer->readUint32(nullptr); DEBUG_D("config version = %u", version); if (version <= configVersion) { testBackend = buffer->readBool(nullptr); if (buffer->readBool(nullptr)) { currentDatacenterId = buffer->readUint32(nullptr); timeDifference = buffer->readInt32(nullptr); lastDcUpdateTime = buffer->readInt32(nullptr); pushSessionId = buffer->readInt64(nullptr); if (version >= 2) { registeredForInternalPush = buffer->readBool(nullptr); } DEBUG_D("current dc id = %u, time difference = %d, registered for push = %d", currentDatacenterId, timeDifference, (int32_t) registeredForInternalPush); uint32_t count = buffer->readUint32(nullptr); for (uint32_t a = 0; a < count; a++) { sessionsToDestroy.push_back(buffer->readInt64(nullptr)); } count = buffer->readUint32(nullptr); for (uint32_t a = 0; a < count; a++) { Datacenter *datacenter = new Datacenter(buffer); datacenters[datacenter->getDatacenterId()] = datacenter; DEBUG_D("datacenter(%p) %u loaded (hasAuthKey = %d)", datacenter, datacenter->getDatacenterId(), (int) datacenter->hasAuthKey()); } } } buffer->reuse(); } if (currentDatacenterId != 0 && currentUserId) { Datacenter *datacenter = getDatacenterWithId(currentDatacenterId); if (datacenter == nullptr || !datacenter->hasAuthKey()) { if (datacenter != nullptr) { DEBUG_D("reset authorization because of dc %p", datacenter); } currentDatacenterId = 0; datacenters.clear(); scheduleTask([&] { if (delegate != nullptr) { delegate->onLogout(); } }); } } initDatacenters(); if ((datacenters.size() != 0 && currentDatacenterId == 0) || pushSessionId == 0) { if (pushSessionId == 0) { RAND_bytes((uint8_t *) &pushSessionId, 8); } if (currentDatacenterId == 0) { currentDatacenterId = 2; } saveConfig(); } movingToDatacenterId = DEFAULT_DATACENTER_ID; } void ConnectionsManager::saveConfigInternal(NativeByteBuffer *buffer) { buffer->writeInt32(configVersion); buffer->writeBool(testBackend); Datacenter *currentDatacenter = getDatacenterWithId(currentDatacenterId); buffer->writeBool(currentDatacenter != nullptr); if (currentDatacenter != nullptr) { buffer->writeInt32(currentDatacenterId); buffer->writeInt32(timeDifference); buffer->writeInt32(lastDcUpdateTime); buffer->writeInt64(pushSessionId); buffer->writeBool(registeredForInternalPush); std::vector sessions; currentDatacenter->getSessions(sessions); uint32_t count = (uint32_t) sessions.size(); buffer->writeInt32(count); for (uint32_t a = 0; a < count; a++) { buffer->writeInt64(sessions[a]); } count = (uint32_t) datacenters.size(); buffer->writeInt32(count); for (std::map::iterator iter = datacenters.begin(); iter != datacenters.end(); iter++) { iter->second->serializeToStream(buffer); } } } void ConnectionsManager::saveConfig() { if (config == nullptr) { config = new Config("tgnet.dat"); } static NativeByteBuffer *sizeCalculator = new NativeByteBuffer(true); sizeCalculator->clearCapacity(); saveConfigInternal(sizeCalculator); NativeByteBuffer *buffer = BuffersStorage::getInstance().getFreeBuffer(sizeCalculator->capacity()); saveConfigInternal(buffer); config->writeConfig(buffer); buffer->reuse(); } inline NativeByteBuffer *decompressGZip(NativeByteBuffer *data) { int retCode; z_stream stream; memset(&stream, 0, sizeof(z_stream)); stream.avail_in = data->limit(); stream.next_in = data->bytes(); retCode = inflateInit2(&stream, 15 + 32); if (retCode != Z_OK) { DEBUG_E("can't decompress data"); exit(1); } NativeByteBuffer *result = BuffersStorage::getInstance().getFreeBuffer(data->limit() * 4); stream.avail_out = result->capacity(); stream.next_out = result->bytes(); while (1) { retCode = inflate(&stream, Z_NO_FLUSH); if (retCode == Z_STREAM_END) { break; } if (retCode == Z_OK) { NativeByteBuffer *newResult = BuffersStorage::getInstance().getFreeBuffer(result->capacity() * 2); memcpy(newResult->bytes(), result->bytes(), result->capacity()); stream.avail_out = newResult->capacity() - result->capacity(); stream.next_out = newResult->bytes() + result->capacity(); result->reuse(); result = newResult; } else { DEBUG_E("can't decompress data"); exit(1); } } result->limit((uint32_t) stream.total_out); inflateEnd(&stream); return result; } inline NativeByteBuffer *compressGZip(NativeByteBuffer *buffer) { if (buffer == nullptr || buffer->limit() == 0) { return nullptr; } z_stream stream; int retCode; memset(&stream, 0, sizeof(z_stream)); stream.avail_in = buffer->limit(); stream.next_in = buffer->bytes(); retCode = deflateInit2(&stream, Z_BEST_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY); if (retCode != Z_OK) { DEBUG_E("%s: deflateInit2() failed with error %i", __PRETTY_FUNCTION__, retCode); return nullptr; } NativeByteBuffer *result = BuffersStorage::getInstance().getFreeBuffer(buffer->limit()); stream.avail_out = result->limit(); stream.next_out = result->bytes(); retCode = deflate(&stream, Z_FINISH); if ((retCode != Z_OK) && (retCode != Z_STREAM_END)) { DEBUG_E("%s: deflate() failed with error %i", __PRETTY_FUNCTION__, retCode); deflateEnd(&stream); result->reuse(); return nullptr; } if (retCode != Z_STREAM_END || stream.total_out >= buffer->limit() - 4) { deflateEnd(&stream); result->reuse(); return nullptr; } result->limit((uint32_t) stream.total_out); deflateEnd(&stream); return result; } int64_t ConnectionsManager::getCurrentTimeMillis() { clock_gettime(CLOCK_REALTIME, &timeSpec); return (int64_t) timeSpec.tv_sec * 1000 + (int64_t) timeSpec.tv_nsec / 1000000; } int64_t ConnectionsManager::getCurrentTimeMonotonicMillis() { clock_gettime(CLOCK_MONOTONIC, &timeSpecMonotonic); return (int64_t) timeSpecMonotonic.tv_sec * 1000 + (int64_t) timeSpecMonotonic.tv_nsec / 1000000; } int32_t ConnectionsManager::getCurrentTime() { return (int32_t) (getCurrentTimeMillis() / 1000) + timeDifference; } bool ConnectionsManager::isTestBackend() { return testBackend; } int32_t ConnectionsManager::getTimeDifference() { return timeDifference; } int64_t ConnectionsManager::generateMessageId() { int64_t messageId = (int64_t) ((((double) getCurrentTimeMillis() + ((double) timeDifference) * 1000) * 4294967296.0) / 1000.0); if (messageId <= lastOutgoingMessageId) { messageId = lastOutgoingMessageId + 1; } while (messageId % 4 != 0) { messageId++; } lastOutgoingMessageId = messageId; return messageId; } bool ConnectionsManager::isNetworkAvailable() { return networkAvailable; } void ConnectionsManager::cleanUp() { scheduleTask([&] { for (requestsIter iter = requestsQueue.begin(); iter != requestsQueue.end();) { Request *request = iter->get(); if (request->requestFlags & RequestFlagWithoutLogin) { iter++; continue; } if (request->onCompleteRequestCallback != nullptr) { TL_error *error = new TL_error(); error->code = -1000; error->text = ""; request->onComplete(nullptr, error, 0); delete error; } iter = requestsQueue.erase(iter); } for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end();) { Request *request = iter->get(); if (request->requestFlags & RequestFlagWithoutLogin) { iter++; continue; } if (request->onCompleteRequestCallback != nullptr) { TL_error *error = new TL_error(); error->code = -1000; error->text = ""; request->onComplete(nullptr, error, 0); delete error; } iter = runningRequests.erase(iter); } quickAckIdToRequestIds.clear(); for (std::map::iterator iter = datacenters.begin(); iter != datacenters.end(); iter++) { iter->second->recreateSessions(); iter->second->authorized = false; } sessionsToDestroy.clear(); currentUserId = 0; registeredForInternalPush = false; saveConfig(); }); } void ConnectionsManager::onConnectionClosed(Connection *connection, int reason) { Datacenter *datacenter = connection->getDatacenter(); if (connection->getConnectionType() == ConnectionTypeGeneric) { if (proxyAddress.empty()) { if (reason == 2) { disconnectTimeoutAmount += connection->getTimeout(); } else { disconnectTimeoutAmount += 4; } if (disconnectTimeoutAmount >= 20) { if (!connection->hasUsefullData()) { requestingSecondAddress = 0; delegate->onRequestNewServerIpAndPort(requestingSecondAddress); } disconnectTimeoutAmount = 0; } } if (datacenter->isHandshaking()) { datacenter->onHandshakeConnectionClosed(connection); } if (datacenter->getDatacenterId() == currentDatacenterId) { if (networkAvailable) { if (proxyAddress.empty()) { if (connectionState != ConnectionStateConnecting) { connectionState = ConnectionStateConnecting; if (delegate != nullptr) { delegate->onConnectionStateChanged(connectionState); } } } else { if (connectionState != ConnectionStateConnectingViaProxy) { connectionState = ConnectionStateConnectingViaProxy; if (delegate != nullptr) { delegate->onConnectionStateChanged(connectionState); } } } } else { if (connectionState != ConnectionStateWaitingForNetwork) { connectionState = ConnectionStateWaitingForNetwork; if (delegate != nullptr) { delegate->onConnectionStateChanged(connectionState); } } } } } else if (connection->getConnectionType() == ConnectionTypePush) { DEBUG_D("connection(%p) push connection closed", connection); sendingPushPing = false; lastPushPingTime = getCurrentTimeMonotonicMillis() - 60000 * 3 + 4000; } } void ConnectionsManager::onConnectionConnected(Connection *connection) { Datacenter *datacenter = connection->getDatacenter(); if (connection->getConnectionType() == ConnectionTypeGeneric) { if (datacenter->isHandshaking()) { datacenter->onHandshakeConnectionConnected(connection); return; } } if (datacenter->hasAuthKey()) { if (connection->getConnectionType() == ConnectionTypePush) { sendingPushPing = false; lastPushPingTime = getCurrentTimeMonotonicMillis(); sendPing(datacenter, true); } else { if (networkPaused && lastPauseTime != 0) { lastPauseTime = getCurrentTimeMonotonicMillis(); } processRequestQueue(connection->getConnectionType(), datacenter->getDatacenterId()); } } } void ConnectionsManager::onConnectionQuickAckReceived(Connection *connection, int32_t ack) { std::map>::iterator iter = quickAckIdToRequestIds.find(ack); if (iter == quickAckIdToRequestIds.end()) { return; } for (requestsIter iter2 = runningRequests.begin(); iter2 != runningRequests.end(); iter2++) { Request *request = iter2->get(); if (std::find(iter->second.begin(), iter->second.end(), request->requestToken) != iter->second.end()) { request->onQuickAck(); } } quickAckIdToRequestIds.erase(iter); } void ConnectionsManager::onConnectionDataReceived(Connection *connection, NativeByteBuffer *data, uint32_t length) { bool error = false; if (length == 4) { int32_t code = data->readInt32(&error); Datacenter *datacenter = connection->getDatacenter(); if (code == -404 && datacenter->isCdnDatacenter) { datacenter->clear(); DEBUG_D("connection(%p, dc%u, type %d) reset auth key duo to -404 error", connection, datacenter->getDatacenterId(), connection->getConnectionType()); } else { DEBUG_E("mtproto error = %d", code); } connection->reconnect(); return; } uint32_t mark = data->position(); int64_t keyId = data->readInt64(&error); if (error) { connection->reconnect(); return; } Datacenter *datacenter = connection->getDatacenter(); if (connectionState != ConnectionStateConnected && connection->getConnectionType() == ConnectionTypeGeneric && datacenter->getDatacenterId() == currentDatacenterId) { connectionState = ConnectionStateConnected; if (delegate != nullptr) { delegate->onConnectionStateChanged(connectionState); } } if (keyId == 0) { int64_t messageId = data->readInt64(&error); if (error) { connection->reconnect(); return; } if (connection->isMessageIdProcessed(messageId)) { return; } uint32_t messageLength = data->readUint32(&error); if (error) { connection->reconnect(); return; } if (messageLength != data->remaining()) { DEBUG_E("connection(%p) received incorrect message length", connection); connection->reconnect(); return; } TLObject *request; if (datacenter->isHandshaking()) { request = datacenter->getCurrentHandshakeRequest(); } else { request = getRequestWithMessageId(messageId); } TLObject *object = TLdeserialize(request, messageLength, data); if (object != nullptr) { if (datacenter->isHandshaking()) { datacenter->processHandshakeResponse(object, messageId); } else { processServerResponse(object, messageId, 0, 0, connection, 0, 0); connection->addProcessedMessageId(messageId); } connection->setHasUsefullData(); if (connection->getConnectionType() == ConnectionTypeGeneric) { disconnectTimeoutAmount = 0; } delete object; } } else { if (length < 24 + 32 || (length - 24) % 16 != 0 || !datacenter->decryptServerResponse(keyId, data->bytes() + mark + 8, data->bytes() + mark + 24, length - 24)) { DEBUG_E("connection(%p) unable to decrypt server response", connection); connection->reconnect(); return; } data->position(mark + 24); int64_t messageServerSalt = data->readInt64(&error); int64_t messageSessionId = data->readInt64(&error); if (messageSessionId != connection->getSissionId()) { DEBUG_E("connection(%p) received invalid message session id (0x%llx instead of 0x%llx)", connection, (uint64_t) messageSessionId, (uint64_t) connection->getSissionId()); return; } bool doNotProcess = false; int64_t messageId = data->readInt64(&error); int32_t messageSeqNo = data->readInt32(&error); uint32_t messageLength = data->readUint32(&error); if (connection->isMessageIdProcessed(messageId)) { doNotProcess = true; } if (messageSeqNo % 2 != 0) { connection->addMessageToConfirm(messageId); } if (!doNotProcess) { TLObject *object = TLdeserialize(nullptr, messageLength, data); if (object != nullptr) { connection->setHasUsefullData(); if (connection->getConnectionType() == ConnectionTypeGeneric) { disconnectTimeoutAmount = 0; } DEBUG_D("connection(%p, dc%u, type %d) received object %s", connection, datacenter->getDatacenterId(), connection->getConnectionType(), typeid(*object).name()); processServerResponse(object, messageId, messageSeqNo, messageServerSalt, connection, 0, 0); connection->addProcessedMessageId(messageId); delete object; if (connection->getConnectionType() == ConnectionTypePush) { std::vector> messages; sendMessagesToConnectionWithConfirmation(messages, connection, false); } } else { if (delegate != nullptr) { delegate->onUnparsedMessageReceived(0, data, connection->getConnectionType()); } } } else { std::vector> messages; sendMessagesToConnectionWithConfirmation(messages, connection, false); } } } TLObject *ConnectionsManager::getRequestWithMessageId(int64_t messageId) { for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if (request->messageId == messageId) { return request->rawRequest; } } return nullptr; } TLObject *ConnectionsManager::TLdeserialize(TLObject *request, uint32_t bytes, NativeByteBuffer *data) { bool error = false; uint32_t position = data->position(); uint32_t constructor = data->readUint32(&error); if (error) { data->position(position); return nullptr; } TLObject *object = TLClassStore::TLdeserialize(data, bytes, constructor, error); if (error) { if (object != nullptr) { delete object; } data->position(position); return nullptr; } if (object == nullptr) { if (request != nullptr) { TL_api_request *apiRequest = dynamic_cast(request); if (apiRequest != nullptr) { object = apiRequest->deserializeResponse(data, bytes, error); DEBUG_D("api request constructor 0x%x, don't parse", constructor); } else { object = request->deserializeResponse(data, constructor, error); if (object != nullptr && error) { delete object; object = nullptr; } } } else { DEBUG_D("not found request to parse constructor 0x%x", constructor); } } if (object == nullptr) { data->position(position); } return object; } void ConnectionsManager::processServerResponse(TLObject *message, int64_t messageId, int32_t messageSeqNo, int64_t messageSalt, Connection *connection, int64_t innerMsgId, int64_t containerMessageId) { const std::type_info &typeInfo = typeid(*message); Datacenter *datacenter = connection->getDatacenter(); if (typeInfo == typeid(TL_new_session_created)) { TL_new_session_created *response = (TL_new_session_created *) message; if (!connection->isSessionProcessed(response->unique_id)) { DEBUG_D("connection(%p, dc%u, type %d) new session created (first message id: 0x%llx, server salt: 0x%llx, unique id: 0x%llx)", connection, datacenter->getDatacenterId(), connection->getConnectionType(), (uint64_t) response->first_msg_id, (uint64_t) response->server_salt, (uint64_t) response->unique_id); std::unique_ptr salt = std::unique_ptr(new TL_future_salt()); salt->valid_until = salt->valid_since = getCurrentTime(); salt->valid_until += 30 * 60; salt->salt = response->server_salt; datacenter->addServerSalt(salt); for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); Datacenter *requestDatacenter = getDatacenterWithId(request->datacenterId); if (request->messageId < response->first_msg_id && request->connectionType & connection->getConnectionType() && requestDatacenter != nullptr && requestDatacenter->getDatacenterId() == datacenter->getDatacenterId()) { DEBUG_D("clear request %p - %s", request->rawRequest, typeid(*request->rawRequest).name()); request->clear(true); } } saveConfig(); if (datacenter->getDatacenterId() == currentDatacenterId && currentUserId) { if (connection->getConnectionType() == ConnectionTypePush) { registerForInternalPushUpdates(); } else if (connection->getConnectionType() == ConnectionTypeGeneric) { if (delegate != nullptr) { delegate->onSessionCreated(); } } } connection->addProcessedSession(response->unique_id); } } else if (typeInfo == typeid(TL_msg_container)) { TL_msg_container *response = (TL_msg_container *) message; size_t count = response->messages.size(); for (uint32_t a = 0; a < count; a++) { TL_message *innerMessage = response->messages[a].get(); int64_t innerMessageId = innerMessage->msg_id; if (innerMessage->seqno % 2 != 0) { connection->addMessageToConfirm(innerMessageId); } if (connection->isMessageIdProcessed(innerMessageId)) { continue; } if (innerMessage->unparsedBody != nullptr) { if (delegate != nullptr) { delegate->onUnparsedMessageReceived(0, innerMessage->unparsedBody.get(), connection->getConnectionType()); } } else { processServerResponse(innerMessage->body.get(), 0, innerMessage->seqno, messageSalt, connection, innerMessageId, messageId); } connection->addProcessedMessageId(innerMessageId); } } else if (typeInfo == typeid(TL_pong)) { if (connection->getConnectionType() == ConnectionTypePush) { if (!registeredForInternalPush) { registerForInternalPushUpdates(); } DEBUG_D("connection(%p, dc%u, type %d) received push ping", connection, datacenter->getDatacenterId(), connection->getConnectionType()); sendingPushPing = false; } else { TL_pong *response = (TL_pong *) message; if (response->ping_id == lastPingId) { int32_t diff = (int32_t) (getCurrentTimeMonotonicMillis() / 1000) - pingTime; if (abs(diff) < 10) { currentPingTime = (diff + currentPingTime) / 2; if (messageId != 0) { int64_t timeMessage = (int64_t) (messageId / 4294967296.0 * 1000); timeDifference = (int32_t) ((timeMessage - getCurrentTimeMillis()) / 1000 - currentPingTime / 2); } } } } } else if (typeInfo == typeid(TL_future_salts)) { TL_future_salts *response = (TL_future_salts *) message; int64_t requestMid = response->req_msg_id; for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if (request->respondsToMessageId(requestMid)) { request->onComplete(response, nullptr, connection->currentNetworkType); request->completed = true; runningRequests.erase(iter); break; } } } else if (dynamic_cast(message)) { DestroySessionRes *response = (DestroySessionRes *) message; DEBUG_D("destroyed session 0x%llx (%s)", (uint64_t) response->session_id, typeInfo == typeid(TL_destroy_session_ok) ? "ok" : "not found"); } else if (typeInfo == typeid(TL_rpc_result)) { TL_rpc_result *response = (TL_rpc_result *) message; int64_t resultMid = response->req_msg_id; bool hasResult = response->result.get() != nullptr; bool ignoreResult = false; if (hasResult) { TLObject *object = response->result.get(); DEBUG_D("connection(%p, dc%u, type %d) received rpc_result with %s", connection, datacenter->getDatacenterId(), connection->getConnectionType(), typeid(*object).name()); } RpcError *error = hasResult ? dynamic_cast(response->result.get()) : nullptr; if (error != nullptr) { DEBUG_E("connection(%p, dc%u, type %d) rpc error %d: %s", connection, datacenter->getDatacenterId(), connection->getConnectionType(), error->error_code, error->error_message.c_str()); if (error->error_code == 303) { uint32_t migrateToDatacenterId = DEFAULT_DATACENTER_ID; static std::vector migrateErrors; if (migrateErrors.empty()) { migrateErrors.push_back("NETWORK_MIGRATE_"); migrateErrors.push_back("PHONE_MIGRATE_"); migrateErrors.push_back("USER_MIGRATE_"); } size_t count = migrateErrors.size(); for (uint32_t a = 0; a < count; a++) { std::string &possibleError = migrateErrors[a]; if (error->error_message.find(possibleError) != std::string::npos) { std::string num = error->error_message.substr(possibleError.size(), error->error_message.size() - possibleError.size()); uint32_t val = (uint32_t) atoi(num.c_str()); migrateToDatacenterId = val; } } if (migrateToDatacenterId != DEFAULT_DATACENTER_ID) { ignoreResult = true; moveToDatacenter(migrateToDatacenterId); } } } uint32_t retryRequestsFromDatacenter = DEFAULT_DATACENTER_ID - 1; uint32_t retryRequestsConnections = 0; if (!ignoreResult) { for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if (request->respondsToMessageId(resultMid)) { DEBUG_D("got response for request %p - %s", request->rawRequest, typeid(*request->rawRequest).name()); bool discardResponse = false; bool isError = false; bool allowInitConnection = true; if (request->onCompleteRequestCallback != nullptr) { TL_error *implicitError = nullptr; NativeByteBuffer *unpacked_data = nullptr; TLObject *result = response->result.get(); if (typeid(*result) == typeid(TL_gzip_packed)) { TL_gzip_packed *innerResponse = (TL_gzip_packed *) result; unpacked_data = decompressGZip(innerResponse->packed_data.get()); TLObject *object = TLdeserialize(request->rawRequest, unpacked_data->limit(), unpacked_data); if (object != nullptr) { response->result = std::unique_ptr(object); } else { response->result = std::unique_ptr(nullptr); } } hasResult = response->result.get() != nullptr; error = hasResult ? dynamic_cast(response->result.get()) : nullptr; TL_error *error2 = hasResult ? dynamic_cast(response->result.get()) : nullptr; if (error != nullptr) { allowInitConnection = false; static std::string authRestart = "AUTH_RESTART"; bool processEvenFailed = error->error_code == 500 && error->error_message.find(authRestart) != std::string::npos; DEBUG_E("request %p rpc error %d: %s", request, error->error_code, error->error_message.c_str()); if ((request->requestFlags & RequestFlagFailOnServerErrors) == 0 || processEvenFailed) { if (error->error_code == 500 || error->error_code < 0) { discardResponse = true; request->minStartTime = request->startTime + (request->serverFailureCount > 10 ? 10 : request->serverFailureCount); request->serverFailureCount++; } else if (error->error_code == 420) { int32_t waitTime = 2; static std::string floodWait = "FLOOD_WAIT_"; if (error->error_message.find(floodWait) != std::string::npos) { std::string num = error->error_message.substr(floodWait.size(), error->error_message.size() - floodWait.size()); waitTime = atoi(num.c_str()); if (waitTime <= 0) { waitTime = 2; } } discardResponse = true; request->failedByFloodWait = waitTime; request->startTime = 0; request->minStartTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000 + waitTime); } else if (error->error_code == 400) { static std::string waitFailed = "MSG_WAIT_FAILED"; if (error->error_message.find(waitFailed) != std::string::npos) { discardResponse = true; request->minStartTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000 + 1); request->startTime = 0; } } } if (!discardResponse) { implicitError = new TL_error(); implicitError->code = error->error_code; implicitError->text = error->error_message; } } else if (error2 == nullptr) { if (request->rawRequest == nullptr || response->result == nullptr) { allowInitConnection = false; DEBUG_E("rawRequest is null"); implicitError = new TL_error(); implicitError->code = -1000; implicitError->text = ""; } } if (!discardResponse) { if (implicitError != nullptr || error2 != nullptr) { isError = true; request->onComplete(nullptr, implicitError != nullptr ? implicitError : error2, connection->currentNetworkType); if (error2 != nullptr) { delete error2; } } else { request->onComplete(response->result.get(), nullptr, connection->currentNetworkType); } } if (implicitError != nullptr && implicitError->code == 401) { allowInitConnection = false; isError = true; static std::string sessionPasswordNeeded = "SESSION_PASSWORD_NEEDED"; if (implicitError->text.find(sessionPasswordNeeded) != std::string::npos) { //ignore this error } else if (datacenter->getDatacenterId() == currentDatacenterId || datacenter->getDatacenterId() == movingToDatacenterId) { if (request->connectionType & ConnectionTypeGeneric && currentUserId) { currentUserId = 0; if (delegate != nullptr) { delegate->onLogout(); } cleanUp(); } } else { datacenter->authorized = false; saveConfig(); discardResponse = true; if (request->connectionType & ConnectionTypeDownload || request->connectionType & ConnectionTypeUpload) { retryRequestsFromDatacenter = datacenter->datacenterId; retryRequestsConnections = request->connectionType; } } } if (unpacked_data != nullptr) { unpacked_data->reuse(); } if (implicitError != nullptr) { delete implicitError; } } if (!discardResponse) { if (allowInitConnection && request->isInitRequest && !isError) { if (datacenter->lastInitVersion != currentVersion) { datacenter->lastInitVersion = currentVersion; saveConfig(); DEBUG_D("dc%d init connection completed", datacenter->getDatacenterId()); } else { DEBUG_D("dc%d rpc is init, but init connection already completed", datacenter->getDatacenterId()); } } request->completed = true; removeRequestFromGuid(request->requestToken); runningRequests.erase(iter); } else { request->messageId = 0; request->messageSeqNo = 0; request->connectionToken = 0; } break; } } } if (retryRequestsFromDatacenter != DEFAULT_DATACENTER_ID - 1) { processRequestQueue(retryRequestsConnections, retryRequestsFromDatacenter); } else { processRequestQueue(0, 0); } } else if (typeInfo == typeid(TL_msgs_ack)) { } else if (typeInfo == typeid(TL_bad_msg_notification)) { TL_bad_msg_notification *result = (TL_bad_msg_notification *) message; DEBUG_E("bad message notification %d for messageId 0x%llx, seqno %d", result->error_code, result->bad_msg_id, result->bad_msg_seqno); switch (result->error_code) { case 16: case 17: case 19: case 32: case 33: case 64: { int64_t realId = messageId != 0 ? messageId : containerMessageId; if (realId == 0) { realId = innerMsgId; } if (realId != 0) { int64_t time = (int64_t) (messageId / 4294967296.0 * 1000); int64_t currentTime = getCurrentTimeMillis(); timeDifference = (int32_t) ((time - currentTime) / 1000 - currentPingTime / 2); } datacenter->recreateSessions(); saveConfig(); lastOutgoingMessageId = 0; clearRequestsForDatacenter(datacenter); break; } case 20: { for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if (request->respondsToMessageId(result->bad_msg_id)) { if (request->completed) { break; } connection->addMessageToConfirm(result->bad_msg_id); request->clear(true); break; } } } default: break; } } else if (typeInfo == typeid(TL_bad_server_salt)) { TL_bad_server_salt *response = (TL_bad_server_salt *) message; if (messageId != 0) { int64_t time = (int64_t) (messageId / 4294967296.0 * 1000); int64_t currentTime = getCurrentTimeMillis(); timeDifference = (int32_t) ((time - currentTime) / 1000 - currentPingTime / 2); lastOutgoingMessageId = messageId > (lastOutgoingMessageId ? messageId : lastOutgoingMessageId); } int64_t resultMid = response->bad_msg_id; if (resultMid != 0) { for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if ((request->connectionType & ConnectionTypeDownload) == 0) { continue; } Datacenter *requestDatacenter = getDatacenterWithId(request->datacenterId); if (requestDatacenter != nullptr && requestDatacenter->getDatacenterId() == datacenter->getDatacenterId()) { request->retryCount = 0; request->failedBySalt = true; } } } datacenter->clearServerSalts(); std::unique_ptr salt = std::unique_ptr(new TL_future_salt()); salt->valid_until = salt->valid_since = getCurrentTime(); salt->valid_until += 30 * 60; salt->salt = messageSalt; datacenter->addServerSalt(salt); saveConfig(); requestSaltsForDatacenter(datacenter); if (datacenter->hasAuthKey()) { processRequestQueue(AllConnectionTypes, datacenter->getDatacenterId()); } } else if (typeInfo == typeid(MsgsStateInfo)) { MsgsStateInfo *response = (MsgsStateInfo *) message; DEBUG_D("connection(%p, dc%u, type %d) got %s for messageId 0x%llx", connection, datacenter->getDatacenterId(), connection->getConnectionType(), typeInfo.name(), response->req_msg_id); std::map::iterator mIter = resendRequests.find(response->req_msg_id); if (mIter != resendRequests.end()) { DEBUG_D("found resend for messageId 0x%llx", mIter->second); connection->addMessageToConfirm(mIter->second); for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if (request->respondsToMessageId(mIter->second)) { if (request->completed) { break; } request->clear(true); break; } } resendRequests.erase(mIter); } } else if (dynamic_cast(message)) { MsgDetailedInfo *response = (MsgDetailedInfo *) message; bool requestResend = false; bool confirm = true; DEBUG_D("connection(%p, dc%u, type %d) got %s for messageId 0x%llx", connection, datacenter->getDatacenterId(), connection->getConnectionType(), typeInfo.name(), response->msg_id); if (typeInfo == typeid(TL_msg_detailed_info)) { for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if (request->respondsToMessageId(response->msg_id)) { if (request->completed) { break; } DEBUG_D("got TL_msg_detailed_info for rpc request %p - %s", request->rawRequest, typeid(*request->rawRequest).name()); int32_t currentTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000); if (request->lastResendTime == 0 || abs(currentTime - request->lastResendTime) >= 60) { request->lastResendTime = currentTime; requestResend = true; } else { confirm = false; } break; } } } else { if (!connection->isMessageIdProcessed(messageId)) { requestResend = true; } } if (requestResend) { TL_msg_resend_req *request = new TL_msg_resend_req(); request->msg_ids.push_back(response->answer_msg_id); NetworkMessage *networkMessage = new NetworkMessage(); networkMessage->message = std::unique_ptr(new TL_message()); networkMessage->message->msg_id = generateMessageId(); networkMessage->message->bytes = request->getObjectSize(); networkMessage->message->body = std::unique_ptr(request); networkMessage->message->seqno = connection->generateMessageSeqNo(false); resendRequests[networkMessage->message->msg_id] = response->answer_msg_id; std::vector> array; array.push_back(std::unique_ptr(networkMessage)); sendMessagesToConnection(array, connection, false); } else if (confirm) { connection->addMessageToConfirm(response->answer_msg_id); } } else if (typeInfo == typeid(TL_gzip_packed)) { TL_gzip_packed *response = (TL_gzip_packed *) message; NativeByteBuffer *data = decompressGZip(response->packed_data.get()); TLObject *object = TLdeserialize(getRequestWithMessageId(messageId), data->limit(), data); if (object != nullptr) { DEBUG_D("connection(%p, dc%u, type %d) received object %s", connection, datacenter->getDatacenterId(), connection->getConnectionType(), typeid(*object).name()); processServerResponse(object, messageId, messageSeqNo, messageSalt, connection, innerMsgId, containerMessageId); delete object; } else { if (delegate != nullptr) { delegate->onUnparsedMessageReceived(messageId, data, connection->getConnectionType()); } } data->reuse(); } else if (typeInfo == typeid(TL_updatesTooLong)) { if (connection->connectionType == ConnectionTypePush) { if (networkPaused) { lastPauseTime = getCurrentTimeMonotonicMillis(); DEBUG_D("received internal push: wakeup network in background"); } else if (lastPauseTime != 0) { lastPauseTime = getCurrentTimeMonotonicMillis(); DEBUG_D("received internal push: reset sleep timeout"); } else { DEBUG_D("received internal push"); } if (delegate != nullptr) { delegate->onInternalPushReceived(); } } else { if (delegate != nullptr) { NativeByteBuffer *data = BuffersStorage::getInstance().getFreeBuffer(message->getObjectSize()); message->serializeToStream(data); data->position(0); delegate->onUnparsedMessageReceived(0, data, connection->getConnectionType()); data->reuse(); } } } } void ConnectionsManager::sendPing(Datacenter *datacenter, bool usePushConnection) { if (usePushConnection && (currentUserId == 0 || !usePushConnection)) { return; } Connection *connection = nullptr; if (usePushConnection) { connection = datacenter->getPushConnection(true); } else { connection = datacenter->getGenericConnection(true); } if (connection == nullptr || (!usePushConnection && connection->getConnectionToken() == 0)) { return; } TL_ping_delay_disconnect *request = new TL_ping_delay_disconnect(); request->ping_id = ++lastPingId; if (usePushConnection) { request->disconnect_delay = 60 * 7; } else { request->disconnect_delay = 35; pingTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000); } NetworkMessage *networkMessage = new NetworkMessage(); networkMessage->message = std::unique_ptr(new TL_message()); networkMessage->message->msg_id = generateMessageId(); networkMessage->message->bytes = request->getObjectSize(); networkMessage->message->body = std::unique_ptr(request); networkMessage->message->seqno = connection->generateMessageSeqNo(false); std::vector> array; array.push_back(std::unique_ptr(networkMessage)); NativeByteBuffer *transportData = datacenter->createRequestsData(array, nullptr, connection); if (usePushConnection) { DEBUG_D("dc%d send ping to push connection", datacenter->getDatacenterId()); sendingPushPing = true; } connection->sendData(transportData, false); } bool ConnectionsManager::isIpv6Enabled() { return ipv6Enabled; } void ConnectionsManager::initDatacenters() { Datacenter *datacenter; if (!testBackend) { if (datacenters.find(1) == datacenters.end()) { datacenter = new Datacenter(1); datacenter->addAddressAndPort("149.154.175.50", 443, 0); datacenter->addAddressAndPort("2001:b28:f23d:f001:0000:0000:0000:000a", 443, 1); datacenters[1] = datacenter; } if (datacenters.find(2) == datacenters.end()) { datacenter = new Datacenter(2); datacenter->addAddressAndPort("149.154.167.51", 443, 0); datacenter->addAddressAndPort("2001:67c:4e8:f002:0000:0000:0000:000a", 443, 1); datacenters[2] = datacenter; } if (datacenters.find(3) == datacenters.end()) { datacenter = new Datacenter(3); datacenter->addAddressAndPort("149.154.175.100", 443, 0); datacenter->addAddressAndPort("2001:b28:f23d:f003:0000:0000:0000:000a", 443, 1); datacenters[3] = datacenter; } if (datacenters.find(4) == datacenters.end()) { datacenter = new Datacenter(4); datacenter->addAddressAndPort("149.154.167.91", 443, 0); datacenter->addAddressAndPort("2001:67c:4e8:f004:0000:0000:0000:000a", 443, 1); datacenters[4] = datacenter; } if (datacenters.find(5) == datacenters.end()) { datacenter = new Datacenter(5); datacenter->addAddressAndPort("149.154.171.5", 443, 0); datacenter->addAddressAndPort("2001:b28:f23f:f005:0000:0000:0000:000a", 443, 1); datacenters[5] = datacenter; } } else { if (datacenters.find(1) == datacenters.end()) { datacenter = new Datacenter(1); datacenter->addAddressAndPort("149.154.175.40", 443, 0); datacenter->addAddressAndPort("2001:b28:f23d:f001:0000:0000:0000:000e", 443, 1); datacenters[1] = datacenter; } if (datacenters.find(2) == datacenters.end()) { datacenter = new Datacenter(2); datacenter->addAddressAndPort("149.154.167.40", 443, 0); datacenter->addAddressAndPort("2001:67c:4e8:f002:0000:0000:0000:000e", 443, 1); datacenters[2] = datacenter; } if (datacenters.find(3) == datacenters.end()) { datacenter = new Datacenter(3); datacenter->addAddressAndPort("149.154.175.117", 443, 0); datacenter->addAddressAndPort("2001:b28:f23d:f003:0000:0000:0000:000e", 443, 1); datacenters[3] = datacenter; } } } void ConnectionsManager::attachConnection(ConnectionSocket *connection) { if (std::find(activeConnections.begin(), activeConnections.end(), connection) != activeConnections.end()) { return; } activeConnections.push_back(connection); } void ConnectionsManager::detachConnection(ConnectionSocket *connection) { std::vector::iterator iter = std::find(activeConnections.begin(), activeConnections.end(), connection); if (iter != activeConnections.end()) { activeConnections.erase(iter); } } int32_t ConnectionsManager::sendRequestInternal(TLObject *object, onCompleteFunc onComplete, onQuickAckFunc onQuickAck, uint32_t flags, uint32_t datacenterId, ConnectionType connetionType, bool immediate) { if (!currentUserId && !(flags & RequestFlagWithoutLogin)) { DEBUG_D("can't do request without login %s", typeid(*object).name()); delete object; return 0; } Request *request = new Request(lastRequestToken++, connetionType, flags, datacenterId, onComplete, onQuickAck, nullptr); request->rawRequest = object; request->rpcRequest = wrapInLayer(object, getDatacenterWithId(datacenterId), request); requestsQueue.push_back(std::unique_ptr(request)); if (immediate) { processRequestQueue(0, 0); } return request->requestToken; } int32_t ConnectionsManager::sendRequest(TLObject *object, onCompleteFunc onComplete, onQuickAckFunc onQuickAck, uint32_t flags, uint32_t datacenterId, ConnectionType connetionType, bool immediate) { int32_t requestToken = lastRequestToken++; sendRequest(object, onComplete, onQuickAck, flags, datacenterId, connetionType, immediate, requestToken); return requestToken; } void ConnectionsManager::sendRequest(TLObject *object, onCompleteFunc onComplete, onQuickAckFunc onQuickAck, uint32_t flags, uint32_t datacenterId, ConnectionType connetionType, bool immediate, int32_t requestToken) { if (!currentUserId && !(flags & RequestFlagWithoutLogin)) { DEBUG_D("can't do request without login %s", typeid(*object).name()); delete object; return; } scheduleTask([&, requestToken, object, onComplete, onQuickAck, flags, datacenterId, connetionType, immediate] { Request *request = new Request(requestToken, connetionType, flags, datacenterId, onComplete, onQuickAck, nullptr); request->rawRequest = object; request->rpcRequest = wrapInLayer(object, getDatacenterWithId(datacenterId), request); requestsQueue.push_back(std::unique_ptr(request)); if (immediate) { processRequestQueue(0, 0); } }); } #ifdef ANDROID void ConnectionsManager::sendRequest(TLObject *object, onCompleteFunc onComplete, onQuickAckFunc onQuickAck, onWriteToSocketFunc onWriteToSocket, uint32_t flags, uint32_t datacenterId, ConnectionType connetionType, bool immediate, int32_t requestToken, jobject ptr1, jobject ptr2, jobject ptr3) { if (!currentUserId && !(flags & RequestFlagWithoutLogin)) { DEBUG_D("can't do request without login %s", typeid(*object).name()); delete object; JNIEnv *env = 0; if (javaVm->GetEnv((void **) &env, JNI_VERSION_1_6) != JNI_OK) { DEBUG_E("can't get jnienv"); exit(1); } if (ptr1 != nullptr) { env->DeleteGlobalRef(ptr1); ptr1 = nullptr; } if (ptr2 != nullptr) { env->DeleteGlobalRef(ptr2); ptr2 = nullptr; } if (ptr3 != nullptr) { env->DeleteGlobalRef(ptr3); ptr3 = nullptr; } return; } scheduleTask([&, requestToken, object, onComplete, onQuickAck, onWriteToSocket, flags, datacenterId, connetionType, immediate, ptr1, ptr2, ptr3] { DEBUG_D("send request %p - %s", object, typeid(*object).name()); Request *request = new Request(requestToken, connetionType, flags, datacenterId, onComplete, onQuickAck, onWriteToSocket); request->rawRequest = object; request->ptr1 = ptr1; request->ptr2 = ptr2; request->ptr3 = ptr3; request->rpcRequest = wrapInLayer(object, getDatacenterWithId(datacenterId), request); DEBUG_D("send request wrapped %p - %s", request->rpcRequest.get(), typeid(*(request->rpcRequest.get())).name()); requestsQueue.push_back(std::unique_ptr(request)); if (immediate) { processRequestQueue(0, 0); } }); } #endif void ConnectionsManager::cancelRequestsForGuid(int32_t guid) { scheduleTask([&, guid] { std::map>::iterator iter = requestsByGuids.find(guid); if (iter != requestsByGuids.end()) { std::vector &requests = iter->second; size_t count = requests.size(); for (uint32_t a = 0; a < count; a++) { cancelRequestInternal(requests[a], true, false); std::map::iterator iter2 = guidsByRequests.find(requests[a]); if (iter2 != guidsByRequests.end()) { guidsByRequests.erase(iter2); } } requestsByGuids.erase(iter); } }); } void ConnectionsManager::bindRequestToGuid(int32_t requestToken, int32_t guid) { scheduleTask([&, requestToken, guid] { std::map>::iterator iter = requestsByGuids.find(guid); if (iter != requestsByGuids.end()) { iter->second.push_back(requestToken); } else { std::vector array; array.push_back(requestToken); requestsByGuids[guid] = array; } guidsByRequests[requestToken] = guid; }); } void ConnectionsManager::setUserId(int32_t userId) { scheduleTask([&, userId] { int32_t oldUserId = currentUserId; currentUserId = userId; if (oldUserId == userId && userId != 0) { registerForInternalPushUpdates(); } if (currentUserId != userId && userId != 0) { updateDcSettings(0, false); } if (currentUserId != 0 && pushConnectionEnabled) { Datacenter *datacenter = getDatacenterWithId(currentDatacenterId); if (datacenter != nullptr) { datacenter->createPushConnection()->setSessionId(pushSessionId); sendPing(datacenter, true); } } }); } void ConnectionsManager::switchBackend() { scheduleTask([&] { currentDatacenterId = 1; testBackend = !testBackend; datacenters.clear(); initDatacenters(); saveConfig(); exit(1); }); } void ConnectionsManager::removeRequestFromGuid(int32_t requestToken) { std::map::iterator iter2 = guidsByRequests.find(requestToken); if (iter2 != guidsByRequests.end()) { std::map>::iterator iter = requestsByGuids.find(iter2->first); if (iter != requestsByGuids.end()) { std::vector::iterator iter3 = std::find(iter->second.begin(), iter->second.end(), iter->first); if (iter3 != iter->second.end()) { iter->second.erase(iter3); if (iter->second.empty()) { requestsByGuids.erase(iter); } } } guidsByRequests.erase(iter2); } } bool ConnectionsManager::cancelRequestInternal(int32_t token, bool notifyServer, bool removeFromClass) { for (requestsIter iter = requestsQueue.begin(); iter != requestsQueue.end(); iter++) { Request *request = iter->get(); if (request->requestToken == token) { request->cancelled = true; DEBUG_D("cancelled queued rpc request %p - %s", request->rawRequest, typeid(*request->rawRequest).name()); requestsQueue.erase(iter); if (removeFromClass) { removeRequestFromGuid(token); } return true; } } for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); if (request->requestToken == token) { if (notifyServer) { TL_rpc_drop_answer *dropAnswer = new TL_rpc_drop_answer(); dropAnswer->req_msg_id = request->messageId; sendRequest(dropAnswer, nullptr, nullptr, RequestFlagEnableUnauthorized | RequestFlagWithoutLogin | RequestFlagFailOnServerErrors, request->datacenterId, request->connectionType, true); } request->cancelled = true; DEBUG_D("cancelled running rpc request %p - %s", request->rawRequest, typeid(*request->rawRequest).name()); runningRequests.erase(iter); if (removeFromClass) { removeRequestFromGuid(token); } return true; } } return false; } void ConnectionsManager::cancelRequest(int32_t token, bool notifyServer) { if (token == 0) { return; } scheduleTask([&, token, notifyServer] { cancelRequestInternal(token, notifyServer, true); }); } void ConnectionsManager::onDatacenterHandshakeComplete(Datacenter *datacenter, int32_t timeDiff) { saveConfig(); uint32_t datacenterId = datacenter->getDatacenterId(); if (datacenterId == currentDatacenterId || datacenterId == movingToDatacenterId) { timeDifference = timeDiff; datacenter->recreateSessions(); clearRequestsForDatacenter(datacenter); } processRequestQueue(AllConnectionTypes, datacenterId); } void ConnectionsManager::onDatacenterExportAuthorizationComplete(Datacenter *datacenter) { saveConfig(); processRequestQueue(AllConnectionTypes, datacenter->getDatacenterId()); } void ConnectionsManager::sendMessagesToConnection(std::vector> &messages, Connection *connection, bool reportAck) { if (messages.empty() || connection == nullptr) { return; } std::vector> currentMessages; Datacenter *datacenter = connection->getDatacenter(); uint32_t currentSize = 0; size_t count = messages.size(); for (uint32_t a = 0; a < count; a++) { NetworkMessage *networkMessage = messages[a].get(); currentMessages.push_back(std::move(messages[a])); currentSize += networkMessage->message->bytes; if (currentSize >= 3 * 1024 || a == count - 1) { int32_t quickAckId = 0; NativeByteBuffer *transportData = datacenter->createRequestsData(currentMessages, reportAck ? &quickAckId : nullptr, connection); if (transportData != nullptr) { if (reportAck && quickAckId != 0) { std::vector requestIds; size_t count2 = currentMessages.size(); for (uint32_t b = 0; b < count2; b++) { NetworkMessage *message = currentMessages[b].get(); if (message->requestId != 0) { requestIds.push_back(message->requestId); } } if (!requestIds.empty()) { std::map>::iterator iter = quickAckIdToRequestIds.find(quickAckId); if (iter == quickAckIdToRequestIds.end()) { quickAckIdToRequestIds[quickAckId] = requestIds; } else { iter->second.insert(iter->second.end(), requestIds.begin(), requestIds.end()); } } } connection->sendData(transportData, reportAck); } else { DEBUG_E("connection(%p) connection data is empty", connection); } currentSize = 0; currentMessages.clear(); } } } void ConnectionsManager::sendMessagesToConnectionWithConfirmation(std::vector> &messages, Connection *connection, bool reportAck) { NetworkMessage *networkMessage = connection->generateConfirmationRequest(); if (networkMessage != nullptr) { messages.push_back(std::unique_ptr(networkMessage)); } sendMessagesToConnection(messages, connection, reportAck); } void ConnectionsManager::requestSaltsForDatacenter(Datacenter *datacenter) { if (std::find(requestingSaltsForDc.begin(), requestingSaltsForDc.end(), datacenter->getDatacenterId()) != requestingSaltsForDc.end()) { return; } requestingSaltsForDc.push_back(datacenter->getDatacenterId()); TL_get_future_salts *request = new TL_get_future_salts(); request->num = 32; sendRequest(request, [&, datacenter](TLObject *response, TL_error *error, int32_t networkType) { std::vector::iterator iter = std::find(requestingSaltsForDc.begin(), requestingSaltsForDc.end(), datacenter->getDatacenterId()); if (iter != requestingSaltsForDc.end()) { requestingSaltsForDc.erase(iter); } if (error == nullptr) { TL_future_salts *res = (TL_future_salts *) response; datacenter->mergeServerSalts(res->salts); saveConfig(); } }, nullptr, RequestFlagWithoutLogin | RequestFlagEnableUnauthorized, datacenter->getDatacenterId(), ConnectionTypeGeneric, true); } void ConnectionsManager::clearRequestsForDatacenter(Datacenter *datacenter) { for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end(); iter++) { Request *request = iter->get(); Datacenter *requestDatacenter = getDatacenterWithId(request->datacenterId); if (requestDatacenter->getDatacenterId() == datacenter->getDatacenterId()) { request->clear(true); } } } void ConnectionsManager::registerForInternalPushUpdates() { if (registeringForPush || !currentUserId) { return; } registeredForInternalPush = false; registeringForPush = true; TL_account_registerDevice *request = new TL_account_registerDevice(); request->token_type = 7; request->token = to_string_uint64(pushSessionId); sendRequest(request, [&](TLObject *response, TL_error *error, int32_t networkType) { if (error == nullptr) { registeredForInternalPush = true; DEBUG_D("registered for internal push"); } else { registeredForInternalPush = false; DEBUG_E("unable to registering for internal push"); } saveConfig(); registeringForPush = false; }, nullptr, 0, DEFAULT_DATACENTER_ID, ConnectionTypeGeneric, true); } inline void addMessageToDatacenter(uint32_t datacenterId, NetworkMessage *networkMessage, std::map>> &messagesToDatacenters) { std::map>>::iterator iter = messagesToDatacenters.find(datacenterId); if (iter == messagesToDatacenters.end()) { std::vector> &array = messagesToDatacenters[datacenterId] = std::vector>(); array.push_back(std::unique_ptr(networkMessage)); } else { iter->second.push_back(std::unique_ptr(networkMessage)); } } void ConnectionsManager::processRequestQueue(uint32_t connectionTypes, uint32_t dc) { static std::map>> genericMessagesToDatacenters; static std::map>> tempMessagesToDatacenters; static std::vector unknownDatacenterIds; static std::vector neededDatacenters; static std::vector unauthorizedDatacenters; genericMessagesToDatacenters.clear(); tempMessagesToDatacenters.clear(); unknownDatacenterIds.clear(); neededDatacenters.clear(); unauthorizedDatacenters.clear(); Connection *genericConnection = nullptr; Datacenter *defaultDatacenter = getDatacenterWithId(currentDatacenterId); if (defaultDatacenter != nullptr) { genericConnection = defaultDatacenter->getGenericConnection(true); } int32_t currentTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000); uint32_t genericRunningRequestCount = 0; uint32_t uploadRunningRequestCount = 0; uint32_t downloadRunningRequestCount = 0; for (requestsIter iter = runningRequests.begin(); iter != runningRequests.end();) { Request *request = iter->get(); const std::type_info &typeInfo = typeid(*request->rawRequest); switch (request->connectionType & 0x0000ffff) { case ConnectionTypeGeneric: genericRunningRequestCount++; break; case ConnectionTypeDownload: downloadRunningRequestCount++; break; case ConnectionTypeUpload: uploadRunningRequestCount++; break; default: break; } uint32_t datacenterId = request->datacenterId; if (datacenterId == DEFAULT_DATACENTER_ID) { if (movingToDatacenterId != DEFAULT_DATACENTER_ID) { iter++; continue; } datacenterId = currentDatacenterId; } if (request->requestFlags & RequestFlagTryDifferentDc) { int32_t requestStartTime = request->startTime; int32_t timeout = 30; if (updatingDcSettings && dynamic_cast(request->rawRequest)) { requestStartTime = updatingDcStartTime; updatingDcStartTime = currentTime; timeout = 60; } if (request->startTime != 0 && abs(currentTime - requestStartTime) >= timeout) { DEBUG_D("move %s to requestsQueue", typeid(*request->rawRequest).name()); requestsQueue.push_back(std::move(*iter)); iter = runningRequests.erase(iter); continue; } } Datacenter *requestDatacenter = getDatacenterWithId(datacenterId); if (requestDatacenter == nullptr) { if (std::find(unknownDatacenterIds.begin(), unknownDatacenterIds.end(), datacenterId) == unknownDatacenterIds.end()) { unknownDatacenterIds.push_back(datacenterId); } iter++; continue; } else { if (requestDatacenter->isCdnDatacenter) { request->requestFlags |= RequestFlagEnableUnauthorized; } if (requestDatacenter->lastInitVersion != currentVersion && !request->isInitRequest && request->rawRequest->isNeedLayer()) { DEBUG_D("move %p - %s to requestsQueue because of initConnection", request->rawRequest, typeid(*request->rawRequest).name()); requestsQueue.push_back(std::move(*iter)); iter = runningRequests.erase(iter); continue; } if (!requestDatacenter->hasAuthKey()) { if (std::find(neededDatacenters.begin(), neededDatacenters.end(), requestDatacenter) == neededDatacenters.end()) { neededDatacenters.push_back(requestDatacenter); } iter++; continue; } else if (!(request->requestFlags & RequestFlagEnableUnauthorized) && !requestDatacenter->authorized && request->datacenterId != DEFAULT_DATACENTER_ID && request->datacenterId != currentDatacenterId) { if (std::find(unauthorizedDatacenters.begin(), unauthorizedDatacenters.end(), requestDatacenter) == unauthorizedDatacenters.end()) { unauthorizedDatacenters.push_back(requestDatacenter); } iter++; continue; } } Connection *connection = requestDatacenter->getConnectionByType(request->connectionType, true); int32_t maxTimeout = request->connectionType & ConnectionTypeGeneric ? 8 : 30; if (!networkAvailable || connection->getConnectionToken() == 0) { iter++; continue; } uint32_t requestConnectionType = request->connectionType & 0x0000ffff; bool forceThisRequest = (connectionTypes & requestConnectionType) && requestDatacenter->getDatacenterId() == dc; if (typeInfo == typeid(TL_get_future_salts) || typeInfo == typeid(TL_destroy_session)) { if (request->messageId != 0) { request->addRespondMessageId(request->messageId); } request->clear(false); forceThisRequest = false; } if (forceThisRequest || (abs(currentTime - request->startTime) > maxTimeout && (currentTime >= request->minStartTime || (request->failedByFloodWait != 0 && (request->minStartTime - currentTime) > request->failedByFloodWait) || (request->failedByFloodWait == 0 && abs(currentTime - request->minStartTime) >= 60)) ) ) { if (!forceThisRequest && request->connectionToken > 0) { if ((request->connectionType & ConnectionTypeGeneric || request->connectionType & ConnectionTypeTemp) && request->connectionToken == connection->getConnectionToken()) { DEBUG_D("request token is valid, not retrying %s", typeInfo.name()); iter++; continue; } else { if (connection->getConnectionToken() != 0 && request->connectionToken == connection->getConnectionToken()) { DEBUG_D("request download token is valid, not retrying %s", typeInfo.name()); iter++; continue; } } } if (request->connectionToken != 0 && request->connectionToken != connection->getConnectionToken()) { request->lastResendTime = 0; } request->retryCount++; if (!request->failedBySalt) { if (request->connectionType & ConnectionTypeDownload) { uint32_t retryMax = 10; if (!(request->requestFlags & RequestFlagForceDownload)) { if (request->failedByFloodWait) { retryMax = 1; } else { retryMax = 6; } } if (request->retryCount >= retryMax) { DEBUG_E("timed out %s", typeInfo.name()); TL_error *error = new TL_error(); error->code = -123; error->text = "RETRY_LIMIT"; request->onComplete(nullptr, error, connection->currentNetworkType); delete error; iter = runningRequests.erase(iter); continue; } } } else { request->failedBySalt = false; } if (request->messageSeqNo == 0) { request->messageSeqNo = connection->generateMessageSeqNo(true); request->messageId = generateMessageId(); } request->startTime = currentTime; NetworkMessage *networkMessage = new NetworkMessage(); networkMessage->message = std::unique_ptr(new TL_message()); networkMessage->message->msg_id = request->messageId; networkMessage->message->bytes = request->serializedLength; networkMessage->message->outgoingBody = request->getRpcRequest(); networkMessage->message->seqno = request->messageSeqNo; networkMessage->requestId = request->requestToken; networkMessage->invokeAfter = (request->requestFlags & RequestFlagInvokeAfter) != 0; networkMessage->needQuickAck = (request->requestFlags & RequestFlagNeedQuickAck) != 0; request->connectionToken = connection->getConnectionToken(); switch (requestConnectionType) { case ConnectionTypeGeneric: addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, genericMessagesToDatacenters); break; case ConnectionTypeTemp: addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, tempMessagesToDatacenters); break; case ConnectionTypeDownload: case ConnectionTypeUpload: { std::vector> array; array.push_back(std::unique_ptr(networkMessage)); sendMessagesToConnectionWithConfirmation(array, connection, false); request->onWriteToSocket(); break; } default: delete networkMessage; } } iter++; } if (genericConnection != nullptr && !sessionsToDestroy.empty() && genericConnection->getConnectionToken() != 0) { std::vector::iterator iter = sessionsToDestroy.begin(); sessionsToDestroy.erase(iter); if (abs(currentTime - lastDestroySessionRequestTime) > 2) { lastDestroySessionRequestTime = currentTime; TL_destroy_session *request = new TL_destroy_session(); request->session_id = *iter; NetworkMessage *networkMessage = new NetworkMessage(); networkMessage->message = std::unique_ptr(new TL_message()); networkMessage->message->msg_id = generateMessageId(); networkMessage->message->bytes = request->getObjectSize(); networkMessage->message->body = std::unique_ptr(request); networkMessage->message->seqno = genericConnection->generateMessageSeqNo(false); addMessageToDatacenter(defaultDatacenter->getDatacenterId(), networkMessage, genericMessagesToDatacenters); } } for (requestsIter iter = requestsQueue.begin(); iter != requestsQueue.end();) { Request *request = iter->get(); if (request->cancelled) { iter = requestsQueue.erase(iter); continue; } uint32_t datacenterId = request->datacenterId; if (datacenterId == DEFAULT_DATACENTER_ID) { if (movingToDatacenterId != DEFAULT_DATACENTER_ID) { iter++; continue; } datacenterId = currentDatacenterId; } if (request->requestFlags & RequestFlagTryDifferentDc) { int32_t requestStartTime = request->startTime; int32_t timeout = 30; if (updatingDcSettings && dynamic_cast(request->rawRequest)) { requestStartTime = updatingDcStartTime; timeout = 60; } else { request->startTime = 0; } if (requestStartTime != 0 && abs(currentTime - requestStartTime) >= timeout) { std::vector allDc; for (std::map::iterator iter2 = datacenters.begin(); iter2 != datacenters.end(); iter2++) { if (iter2->first == datacenterId || iter2->second->isCdnDatacenter) { continue; } allDc.push_back(iter2->first); } uint8_t index; RAND_bytes(&index, 1); datacenterId = allDc[index % allDc.size()]; if (dynamic_cast(request->rawRequest)) { updatingDcStartTime = currentTime; request->datacenterId = datacenterId; } else { currentDatacenterId = datacenterId; } } } Datacenter *requestDatacenter = getDatacenterWithId(datacenterId); if (requestDatacenter == nullptr) { if (std::find(unknownDatacenterIds.begin(), unknownDatacenterIds.end(), datacenterId) == unknownDatacenterIds.end()) { unknownDatacenterIds.push_back(datacenterId); } iter++; continue; } else { if (requestDatacenter->lastInitVersion != currentVersion && !request->isInitRequest) { request->rpcRequest.release(); request->rpcRequest = wrapInLayer(request->rawRequest, requestDatacenter, request); } if (!requestDatacenter->hasAuthKey()) { if (std::find(neededDatacenters.begin(), neededDatacenters.end(), requestDatacenter) == neededDatacenters.end()) { neededDatacenters.push_back(requestDatacenter); } iter++; continue; } else if (!(request->requestFlags & RequestFlagEnableUnauthorized) && !requestDatacenter->authorized && request->datacenterId != DEFAULT_DATACENTER_ID && request->datacenterId != currentDatacenterId) { if (std::find(unauthorizedDatacenters.begin(), unauthorizedDatacenters.end(), requestDatacenter) == unauthorizedDatacenters.end()) { unauthorizedDatacenters.push_back(requestDatacenter); } iter++; continue; } } Connection *connection = requestDatacenter->getConnectionByType(request->connectionType, true); if (request->connectionType & ConnectionTypeGeneric && connection->getConnectionToken() == 0) { iter++; continue; } switch (request->connectionType & 0x0000ffff) { case ConnectionTypeGeneric: if (genericRunningRequestCount >= 60) { iter++; continue; } genericRunningRequestCount++; break; case ConnectionTypeDownload: if (!networkAvailable || downloadRunningRequestCount >= 5) { iter++; continue; } downloadRunningRequestCount++; break; case ConnectionTypeUpload: if (!networkAvailable || uploadRunningRequestCount >= 5) { iter++; continue; } uploadRunningRequestCount++; break; default: break; } uint32_t requestLength = request->rpcRequest->getObjectSize(); if (request->requestFlags & RequestFlagCanCompress) { request->requestFlags &= ~RequestFlagCanCompress; NativeByteBuffer *original = BuffersStorage::getInstance().getFreeBuffer(requestLength); request->rpcRequest->serializeToStream(original); NativeByteBuffer *buffer = compressGZip(original); if (buffer != nullptr) { TL_gzip_packed *packed = new TL_gzip_packed(); packed->originalRequest = std::move(request->rpcRequest); packed->packed_data_to_send = buffer; request->rpcRequest = std::unique_ptr(packed); requestLength = packed->getObjectSize(); } original->reuse(); } request->messageId = generateMessageId(); request->serializedLength = requestLength; request->messageSeqNo = connection->generateMessageSeqNo(true); request->startTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000); request->connectionToken = connection->getConnectionToken(); NetworkMessage *networkMessage = new NetworkMessage(); networkMessage->message = std::unique_ptr(new TL_message()); networkMessage->message->msg_id = request->messageId; networkMessage->message->bytes = request->serializedLength; networkMessage->message->outgoingBody = request->getRpcRequest(); networkMessage->message->seqno = request->messageSeqNo; networkMessage->requestId = request->requestToken; networkMessage->invokeAfter = (request->requestFlags & RequestFlagInvokeAfter) != 0; networkMessage->needQuickAck = (request->requestFlags & RequestFlagNeedQuickAck) != 0; runningRequests.push_back(std::move(*iter)); switch (request->connectionType & 0x0000ffff) { case ConnectionTypeGeneric: addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, genericMessagesToDatacenters); break; case ConnectionTypeTemp: addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, tempMessagesToDatacenters); break; case ConnectionTypeDownload: case ConnectionTypeUpload: { std::vector> array; array.push_back(std::unique_ptr(networkMessage)); sendMessagesToConnectionWithConfirmation(array, connection, false); break; } default: delete networkMessage; } iter = requestsQueue.erase(iter); } for (std::map::iterator iter = datacenters.begin(); iter != datacenters.end(); iter++) { Datacenter *datacenter = iter->second; std::map>>::iterator iter2 = genericMessagesToDatacenters.find(datacenter->getDatacenterId()); if (iter2 == genericMessagesToDatacenters.end()) { Connection *connection = datacenter->getGenericConnection(false); if (connection != nullptr && connection->getConnectionToken() != 0 && connection->hasMessagesToConfirm()) { genericMessagesToDatacenters[datacenter->getDatacenterId()] = std::vector>(); } } iter2 = tempMessagesToDatacenters.find(datacenter->getDatacenterId()); if (iter2 == tempMessagesToDatacenters.end()) { Connection *connection = datacenter->getTempConnection(false); if (connection != nullptr && connection->getConnectionToken() != 0 && connection->hasMessagesToConfirm()) { tempMessagesToDatacenters[datacenter->getDatacenterId()] = std::vector>(); } } } for (std::map>>::iterator iter = genericMessagesToDatacenters.begin(); iter != genericMessagesToDatacenters.end(); iter++) { Datacenter *datacenter = getDatacenterWithId(iter->first); if (datacenter != nullptr) { bool scannedPreviousRequests = false; int64_t lastSentMessageRpcId = 0; bool needQuickAck = false; std::vector> &array = iter->second; size_t count = array.size(); for (uint32_t b = 0; b < count; b++) { NetworkMessage *networkMessage = array[b].get(); if (networkMessage->needQuickAck) { needQuickAck = true; } if (networkMessage->invokeAfter) { if (!scannedPreviousRequests) { scannedPreviousRequests = true; std::vector currentRequests; for (uint32_t a = 0; a < count; a++) { NetworkMessage *currentNetworkMessage = array[a].get(); TL_message *currentMessage = currentNetworkMessage->message.get(); if (currentNetworkMessage->invokeAfter) { currentRequests.push_back(currentMessage->msg_id); } } int64_t maxRequestId = 0; for (requestsIter iter2 = runningRequests.begin(); iter2 != runningRequests.end(); iter2++) { Request *request = iter2->get(); if (request->requestFlags & RequestFlagInvokeAfter) { if (request->messageId > maxRequestId && std::find(currentRequests.begin(), currentRequests.end(), request->messageId) == currentRequests.end()) { maxRequestId = request->messageId; } } } lastSentMessageRpcId = maxRequestId; } TL_message *message = networkMessage->message.get(); if (lastSentMessageRpcId != 0 && lastSentMessageRpcId != message->msg_id) { TL_invokeAfterMsg *request = new TL_invokeAfterMsg(); request->msg_id = lastSentMessageRpcId; if (message->outgoingBody != nullptr) { DEBUG_D("wrap outgoingBody(%p, %s) to TL_invokeAfterMsg", message->outgoingBody, typeid(*message->outgoingBody).name()); request->outgoingQuery = message->outgoingBody; message->outgoingBody = nullptr; } else { DEBUG_D("wrap body(%p, %s) to TL_invokeAfterMsg", message->body.get(), typeid(*(message->body.get())).name()); request->query = std::move(message->body); } message->body = std::unique_ptr(request); message->bytes += 4 + 8; } lastSentMessageRpcId = message->msg_id; } } sendMessagesToConnectionWithConfirmation(array, datacenter->getGenericConnection(true), needQuickAck); } } for (std::map>>::iterator iter = tempMessagesToDatacenters.begin(); iter != tempMessagesToDatacenters.end(); iter++) { Datacenter *datacenter = getDatacenterWithId(iter->first); if (datacenter != nullptr) { std::vector> &array = iter->second; sendMessagesToConnectionWithConfirmation(array, datacenter->getTempConnection(true), false); } } if (connectionTypes == ConnectionTypeGeneric && dc == currentDatacenterId) { std::map>>::iterator iter2 = genericMessagesToDatacenters.find(currentDatacenterId); if (iter2 == genericMessagesToDatacenters.end()) { sendPing(getDatacenterWithId(currentDatacenterId), false); } } if (!unknownDatacenterIds.empty()) { updateDcSettings(0, false); } size_t count = neededDatacenters.size(); for (uint32_t a = 0; a < count; a++) { Datacenter *datacenter = neededDatacenters[a]; if (datacenter->getDatacenterId() != movingToDatacenterId && !datacenter->isHandshaking() && !datacenter->hasAuthKey()) { datacenter->beginHandshake(true); } } if (currentUserId) { count = unauthorizedDatacenters.size(); for (uint32_t a = 0; a < count; a++) { Datacenter *datacenter = unauthorizedDatacenters[a]; uint32_t id = datacenter->getDatacenterId(); if (id != currentDatacenterId && id != movingToDatacenterId && !datacenter->isExportingAuthorization()) { datacenter->exportAuthorization(); } } } } Datacenter *ConnectionsManager::getDatacenterWithId(uint32_t datacenterId) { if (datacenterId == DEFAULT_DATACENTER_ID) { return datacenters[currentDatacenterId]; } std::map::iterator iter = datacenters.find(datacenterId); return iter != datacenters.end() ? iter->second : nullptr; } std::unique_ptr ConnectionsManager::wrapInLayer(TLObject *object, Datacenter *datacenter, Request *baseRequest) { if (object->isNeedLayer()) { if (datacenter == nullptr || datacenter->lastInitVersion != currentVersion) { if (datacenter != nullptr && datacenter->getDatacenterId() == currentDatacenterId) { registerForInternalPushUpdates(); } baseRequest->isInitRequest = true; initConnection *request = new initConnection(); request->query = std::unique_ptr(object); request->api_id = currentApiId; request->app_version = currentAppVersion; request->lang_code = currentLangCode; request->system_lang_code = currentLangCode; request->lang_pack = "android"; request->system_lang_code = currentSystemLangCode; if (datacenter == nullptr || datacenter->isCdnDatacenter) { request->device_model = "n/a"; request->system_version = "n/a"; } else { request->device_model = currentDeviceModel; request->system_version = currentSystemVersion; } if (request->lang_code.empty()) { request->lang_code = "en"; } if (request->device_model.empty()) { request->device_model = "n/a"; } if (request->app_version.empty()) { request->app_version = "n/a"; } if (request->system_version.empty()) { request->system_version = "n/a"; } invokeWithLayer *request2 = new invokeWithLayer(); request2->layer = currentLayer; request2->query = std::unique_ptr(request); DEBUG_D("wrap in layer %s", typeid(*object).name()); return std::unique_ptr(request2); } } return std::unique_ptr(object); } void ConnectionsManager::updateDcSettings(uint32_t dcNum, bool workaround) { if (workaround) { if (updatingDcSettingsWorkaround) { return; } updatingDcSettingsWorkaround = true; } else { if (updatingDcSettings) { return; } updatingDcSettings = true; updatingDcStartTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000); } TL_help_getConfig *request = new TL_help_getConfig(); sendRequest(request, [&, workaround](TLObject *response, TL_error *error, int32_t networkType) { if (!workaround && !updatingDcSettings || workaround && !updatingDcSettingsWorkaround) { return; } if (response != nullptr) { TL_config *config = (TL_config *) response; if (!workaround) { int32_t updateIn = config->expires - getCurrentTime(); if (updateIn <= 0) { updateIn = 120; } lastDcUpdateTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000) - DC_UPDATE_TIME + updateIn; } struct DatacenterInfo { std::vector addressesIpv4; std::vector addressesIpv6; std::vector addressesIpv4Download; std::vector addressesIpv6Download; bool isCdn = false; void addAddressAndPort(TL_dcOption *dcOption) { std::vector *addresses; if (!isCdn) { isCdn = dcOption->cdn; } if (dcOption->media_only) { if (dcOption->ipv6) { addresses = &addressesIpv6Download; } else { addresses = &addressesIpv4Download; } } else { if (dcOption->ipv6) { addresses = &addressesIpv6; } else { addresses = &addressesIpv4; } } for (std::vector::iterator iter = addresses->begin(); iter != addresses->end(); iter++) { if (iter->address == dcOption->ip_address && iter->port == dcOption->port) { return; } } DEBUG_D("getConfig add %s:%d to dc%d", dcOption->ip_address.c_str(), dcOption->port, dcOption->id); addresses->push_back(TcpAddress(dcOption->ip_address, dcOption->port, dcOption->flags)); } }; std::map> map; size_t count = config->dc_options.size(); for (uint32_t a = 0; a < count; a++) { TL_dcOption *dcOption = config->dc_options[a].get(); std::map>::iterator iter = map.find((uint32_t) dcOption->id); DatacenterInfo *info; if (iter == map.end()) { map[dcOption->id] = std::unique_ptr(info = new DatacenterInfo); } else { info = iter->second.get(); } info->addAddressAndPort(dcOption); } if (!map.empty()) { for (std::map>::iterator iter = map.begin(); iter != map.end(); iter++) { Datacenter *datacenter = getDatacenterWithId(iter->first); DatacenterInfo *info = iter->second.get(); if (datacenter == nullptr) { datacenter = new Datacenter(iter->first); datacenters[iter->first] = datacenter; } datacenter->replaceAddresses(info->addressesIpv4, info->isCdn ? 8 : 0); datacenter->replaceAddresses(info->addressesIpv6, info->isCdn ? 9 : 1); datacenter->replaceAddresses(info->addressesIpv4Download, info->isCdn ? 10 : 2); datacenter->replaceAddresses(info->addressesIpv6Download, info->isCdn ? 11 : 3); if (iter->first == movingToDatacenterId) { movingToDatacenterId = DEFAULT_DATACENTER_ID; moveToDatacenter(iter->first); } } saveConfig(); processRequestQueue(AllConnectionTypes, 0); } if (delegate != nullptr) { delegate->onUpdateConfig(config); } } if (workaround) { updatingDcSettingsWorkaround = false; } else { updatingDcSettings = false; } }, nullptr, RequestFlagEnableUnauthorized | RequestFlagWithoutLogin | (workaround ? 0 : RequestFlagTryDifferentDc), dcNum == 0 ? currentDatacenterId : dcNum, workaround ? ConnectionTypeTemp : ConnectionTypeGeneric, true); } void ConnectionsManager::moveToDatacenter(uint32_t datacenterId) { if (movingToDatacenterId == datacenterId) { return; } movingToDatacenterId = datacenterId; Datacenter *currentDatacenter = getDatacenterWithId(currentDatacenterId); clearRequestsForDatacenter(currentDatacenter); if (currentUserId) { TL_auth_exportAuthorization *request = new TL_auth_exportAuthorization(); request->dc_id = datacenterId; sendRequest(request, [&, datacenterId](TLObject *response, TL_error *error, int32_t networkType) { if (error == nullptr) { movingAuthorization = std::move(((TL_auth_exportedAuthorization *) response)->bytes); authorizeOnMovingDatacenter(); } else { moveToDatacenter(datacenterId); } }, nullptr, RequestFlagWithoutLogin, DEFAULT_DATACENTER_ID, ConnectionTypeGeneric, true); } else { authorizeOnMovingDatacenter(); } } void ConnectionsManager::authorizeOnMovingDatacenter() { Datacenter *datacenter = getDatacenterWithId(movingToDatacenterId); if (datacenter == nullptr) { updateDcSettings(0, false); return; } datacenter->recreateSessions(); clearRequestsForDatacenter(datacenter); if (!datacenter->hasAuthKey() && !datacenter->isHandshaking()) { datacenter->clearServerSalts(); datacenter->beginHandshake(true); } if (movingAuthorization != nullptr) { TL_auth_importAuthorization *request = new TL_auth_importAuthorization(); request->id = currentUserId; request->bytes = std::move(movingAuthorization); sendRequest(request, [&](TLObject *response, TL_error *error, int32_t networkType) { if (error == nullptr) { authorizedOnMovingDatacenter(); } else { moveToDatacenter(movingToDatacenterId); } }, nullptr, RequestFlagWithoutLogin, datacenter->getDatacenterId(), ConnectionTypeGeneric, true); } else { authorizedOnMovingDatacenter(); } } void ConnectionsManager::authorizedOnMovingDatacenter() { movingAuthorization.reset(); currentDatacenterId = movingToDatacenterId; movingToDatacenterId = DEFAULT_DATACENTER_ID; saveConfig(); processRequestQueue(0, 0); } void ConnectionsManager::applyDatacenterAddress(uint32_t datacenterId, std::string ipAddress, uint32_t port) { scheduleTask([&, datacenterId, ipAddress, port] { Datacenter *datacenter = getDatacenterWithId(datacenterId); if (datacenter != nullptr) { std::vector addresses; addresses.push_back(TcpAddress(ipAddress, port, 0)); datacenter->suspendConnections(); datacenter->replaceAddresses(addresses, 0); datacenter->resetAddressAndPortNum(); saveConfig(); if (datacenter->isHandshaking()) { datacenter->beginHandshake(true); } updateDcSettings(datacenterId, false); } }); } ConnectionState ConnectionsManager::getConnectionState() { return connectionState; } void ConnectionsManager::setDelegate(ConnectiosManagerDelegate *connectiosManagerDelegate) { delegate = connectiosManagerDelegate; } void ConnectionsManager::setPushConnectionEnabled(bool value) { pushConnectionEnabled = value; Datacenter *datacenter = getDatacenterWithId(currentDatacenterId); if (datacenter != nullptr) { if (!pushConnectionEnabled) { Connection *connection = datacenter->getPushConnection(false); if (connection != nullptr) { connection->suspendConnection(); } } else { datacenter->createPushConnection()->setSessionId(pushSessionId); sendPing(datacenter, true); } } } void ConnectionsManager::applyDnsConfig(NativeByteBuffer *buffer) { scheduleTask([&, buffer] { TL_help_configSimple *config = Datacenter::decodeSimpleConfig(buffer); int currentDate = getCurrentTime(); if (config != nullptr && config->date <= currentDate && currentDate <= config->expires) { Datacenter *datacenter = getDatacenterWithId(config->dc_id); if (datacenter != nullptr) { std::vector addresses; for (std::vector>::iterator iter = config->ip_port_list.begin(); iter != config->ip_port_list.end(); iter++) { TL_ipPort *ipPort = iter->get(); addresses.push_back(TcpAddress(ipPort->ipv4, ipPort->port, 0)); DEBUG_D("got address %s and port %d for dc%d", ipPort->ipv4.c_str(), ipPort->port, config->dc_id); } if (!addresses.empty()) { datacenter->replaceAddresses(addresses, TcpAddressFlagTemp); Connection *connection = datacenter->getTempConnection(false); if (connection != nullptr) { connection->suspendConnection(); } if (datacenter->isHandshaking()) { datacenter->beginHandshake(true); } updateDcSettings(config->dc_id, true); } } delete config; } else if (requestingSecondAddress == 0) { requestingSecondAddress = 1; delegate->onRequestNewServerIpAndPort(requestingSecondAddress); } buffer->reuse(); }); } void ConnectionsManager::init(uint32_t version, int32_t layer, int32_t apiId, std::string deviceModel, std::string systemVersion, std::string appVersion, std::string langCode, std::string systemLangCode, std::string configPath, std::string logPath, int32_t userId, bool isPaused, bool enablePushConnection, bool hasNetwork, int32_t networkType) { currentVersion = version; currentLayer = layer; currentApiId = apiId; currentConfigPath = configPath; currentDeviceModel = deviceModel; currentSystemVersion = systemVersion; currentAppVersion = appVersion; currentLangCode = langCode; currentSystemLangCode = systemLangCode; currentUserId = userId; currentLogPath = logPath; pushConnectionEnabled = enablePushConnection; currentNetworkType = networkType; networkAvailable = hasNetwork; if (isPaused) { lastPauseTime = getCurrentTimeMonotonicMillis(); } if (!currentConfigPath.empty() && currentConfigPath.find_last_of('/') != currentConfigPath.size() - 1) { currentConfigPath += "/"; } if (!logPath.empty()) { FileLog::init(logPath); } loadConfig(); pthread_create(&networkThread, NULL, (ConnectionsManager::ThreadProc), this); } void ConnectionsManager::setProxySettings(std::string address, uint16_t port, std::string username, std::string password) { scheduleTask([&, address, port, username, password] { bool reconnect = proxyAddress != address; proxyAddress = address; proxyPort = port; proxyUser = username; proxyPassword = password; if (reconnect) { for (std::map::iterator iter = datacenters.begin(); iter != datacenters.end(); iter++) { iter->second->suspendConnections(); } processRequestQueue(0, 0); } }); } void ConnectionsManager::setLangCode(std::string langCode) { scheduleTask([&, langCode] { if (currentLangCode.compare(langCode) == 0) { return; } currentLangCode = langCode; for (std::map::iterator iter = datacenters.begin(); iter != datacenters.end(); iter++) { iter->second->lastInitVersion = 0; } saveConfig(); }); } void ConnectionsManager::resumeNetwork(bool partial) { scheduleTask([&, partial] { if (partial) { if (networkPaused) { lastPauseTime = getCurrentTimeMonotonicMillis(); networkPaused = false; DEBUG_D("wakeup network in background"); } else if (lastPauseTime != 0) { lastPauseTime = getCurrentTimeMonotonicMillis(); networkPaused = false; DEBUG_D("reset sleep timeout"); } } else { DEBUG_D("wakeup network"); lastPauseTime = 0; networkPaused = false; } }); } void ConnectionsManager::pauseNetwork() { if (lastPauseTime != 0) { return; } lastPauseTime = getCurrentTimeMonotonicMillis(); } void ConnectionsManager::setNetworkAvailable(bool value, int32_t type) { scheduleTask([&, value, type] { networkAvailable = value; currentNetworkType = type; if (!networkAvailable) { connectionState = ConnectionStateWaitingForNetwork; } else { for (std::map::iterator iter = datacenters.begin(); iter != datacenters.end(); iter++) { if (iter->second->isHandshaking()) { iter->second->createGenericConnection()->connect(); } } } if (delegate != nullptr) { delegate->onConnectionStateChanged(connectionState); } }); } void ConnectionsManager::setUseIpv6(bool value) { scheduleTask([&, value] { ipv6Enabled = value; }); } void ConnectionsManager::setMtProtoVersion(int version) { mtProtoVersion = version; } int32_t ConnectionsManager::getMtProtoVersion() { return mtProtoVersion; } #ifdef ANDROID void ConnectionsManager::useJavaVM(JavaVM *vm, bool useJavaByteBuffers) { javaVm = vm; if (useJavaByteBuffers) { JNIEnv *env = 0; if (javaVm->GetEnv((void **) &env, JNI_VERSION_1_6) != JNI_OK) { DEBUG_E("can't get jnienv"); exit(1); } jclass_ByteBuffer = (jclass) env->NewGlobalRef(env->FindClass("java/nio/ByteBuffer")); if (jclass_ByteBuffer == 0) { DEBUG_E("can't find java ByteBuffer class"); exit(1); } jclass_ByteBuffer_allocateDirect = env->GetStaticMethodID(jclass_ByteBuffer, "allocateDirect", "(I)Ljava/nio/ByteBuffer;"); if (jclass_ByteBuffer_allocateDirect == 0) { DEBUG_E("can't find java ByteBuffer allocateDirect"); exit(1); } DEBUG_D("using java ByteBuffer"); } } #endif /* public void applyCountryPortNumber(final String phone) { if (phone == null || phone.length() == 0) { return; } Utilities.stageQueue.postRunnable(new Runnable() { @Override public void run() { if (phone.startsWith("968")) { for (HashMap.Entry entry : datacenters.entrySet()) { Datacenter datacenter = entry.getValue(); datacenter.overridePort = 8888; datacenter.suspendConnections(); } } else { for (HashMap.Entry entry : datacenters.entrySet()) { Datacenter datacenter = entry.getValue(); datacenter.overridePort = -1; } } } }); } */