| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- <?php
- namespace Models;
- use Libs\Database;
- class UnipileMessagesModel
- {
- private \PDO $pdo;
- public function __construct()
- {
- $this->pdo = Database::pdo();
- }
- public function storeWebhookEvent(int $integrationId, string $eventType, string $externalId, array $payload, bool $processed): int
- {
- $payloadJson = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
- if ($payloadJson === false) {
- $payloadJson = '{}';
- }
- $stmt = $this->pdo->prepare(
- "INSERT INTO webhook_event (
- integration_id, webhook_event_type, webhook_event_external_id,
- webhook_event_payload, webhook_event_processed, webhook_event_processed_at
- ) VALUES (
- :integration_id, :event_type, :external_id,
- CAST(:payload AS jsonb), :processed, NOW()
- )
- ON CONFLICT (integration_id, webhook_event_type, webhook_event_external_id, webhook_event_deleted_at)
- DO UPDATE SET
- webhook_event_payload = EXCLUDED.webhook_event_payload,
- webhook_event_processed = EXCLUDED.webhook_event_processed,
- webhook_event_processed_at = NOW()
- RETURNING webhook_event_id"
- );
- $stmt->execute([
- 'integration_id' => $integrationId,
- 'event_type' => mb_substr($eventType, 0, 50),
- 'external_id' => $externalId,
- 'payload' => $payloadJson,
- 'processed' => $processed,
- ]);
- return (int) $stmt->fetchColumn();
- }
- public function upsertMessageFromWebhook(array $integration, array $payload): ?array
- {
- $accountType = mb_strtoupper(trim((string) ($payload['account_type'] ?? '')));
- if ($accountType !== 'WHATSAPP') {
- return null;
- }
- $chatId = trim((string) ($payload['chat_id'] ?? ''));
- $messageId = trim((string) ($payload['message_id'] ?? ''));
- if ($chatId === '' || $messageId === '') {
- return null;
- }
- $companyId = (int) ($integration['company_id'] ?? 0);
- $integrationId = (int) ($integration['integration_id'] ?? 0);
- $operatorId = $this->resolveOperatorId($companyId, (int) ($integration['operator_id'] ?? 0));
- if ($companyId <= 0 || $integrationId <= 0 || $operatorId <= 0) {
- return null;
- }
- $sender = is_array($payload['sender'] ?? null) ? $payload['sender'] : [];
- $attendees = is_array($payload['attendees'] ?? null) ? $payload['attendees'] : [];
- $senderProviderId = $this->value($sender, ['attendee_provider_id', 'sender_id', 'id'], 'unknown');
- $isOperator = $this->isOperatorMessage($integration, $payload, $senderProviderId);
- [$clientProviderId, $clientName] = $this->resolveClientIdentity($integration, $payload, $sender, $attendees, $senderProviderId, $isOperator);
- $client = $this->upsertClient($companyId, $clientProviderId, $clientName);
- $messageText = (string) ($payload['message'] ?? $payload['text'] ?? '');
- $sentAt = $this->normalizeTimestamp((string) ($payload['timestamp'] ?? ''));
- $conversation = $this->upsertConversation($companyId, $integrationId, $operatorId, (int) $client['client_id'], $chatId, $payload, $messageText, $isOperator, $sentAt);
- $message = $this->upsertMessage((int) $conversation['conversation_id'], $messageId, $payload, $senderProviderId, $isOperator, $messageText, $sentAt);
- $this->replaceParticipants((int) $conversation['conversation_id'], $attendees);
- $this->replaceAttachments((int) $message['message_id'], is_array($payload['attachments'] ?? null) ? $payload['attachments'] : []);
- return [
- 'conversation_id' => (int) $conversation['conversation_id'],
- 'message_id' => (int) $message['message_id'],
- ];
- }
- public function createOutboundLocalMessage(int $conversationId, array $unipileResponse, string $fallbackText): array
- {
- $messageId = trim((string) ($unipileResponse['id'] ?? $unipileResponse['message_id'] ?? ''));
- if ($messageId === '') {
- $messageId = 'local-' . bin2hex(random_bytes(12));
- }
- $payload = [
- 'provider_id' => $unipileResponse['provider_id'] ?? '',
- 'seen' => $unipileResponse['seen'] ?? false,
- 'delivered' => $unipileResponse['delivered'] ?? true,
- 'edited' => $unipileResponse['edited'] ?? false,
- 'deleted' => $unipileResponse['deleted'] ?? false,
- 'hidden' => $unipileResponse['hidden'] ?? false,
- 'is_event' => $unipileResponse['is_event'] ?? false,
- 'event_type' => $unipileResponse['event_type'] ?? 0,
- ];
- return $this->upsertMessage(
- $conversationId,
- $messageId,
- $payload,
- (string) ($unipileResponse['sender_id'] ?? ''),
- true,
- (string) ($unipileResponse['text'] ?? $unipileResponse['message'] ?? $fallbackText),
- $this->normalizeTimestamp((string) ($unipileResponse['timestamp'] ?? ''))
- );
- }
- public function getConversationForSending(int $companyId, int $conversationId): ?array
- {
- $stmt = $this->pdo->prepare(
- "SELECT c.*, i.integration_account_id, i.integration_is_connected, i.integration_status
- FROM conversation c
- INNER JOIN integration i
- ON i.integration_id = c.integration_id
- AND i.integration_deleted_at = 'infinity'
- WHERE c.company_id = :company_id
- AND c.conversation_id = :conversation_id
- AND c.conversation_deleted_at = 'infinity'
- AND i.integration_provider = 'whatsapp'
- LIMIT 1"
- );
- $stmt->execute([
- 'company_id' => $companyId,
- 'conversation_id' => $conversationId,
- ]);
- $row = $stmt->fetch(\PDO::FETCH_ASSOC);
- return $row === false ? null : $row;
- }
- public function updateConversationAfterOutbound(int $conversationId, string $text): void
- {
- $stmt = $this->pdo->prepare(
- "UPDATE conversation
- SET conversation_last_message_at = NOW(),
- conversation_last_message_preview = :preview,
- conversation_last_message_from = 'operator'
- WHERE conversation_id = :conversation_id
- AND conversation_deleted_at = 'infinity'"
- );
- $stmt->execute([
- 'preview' => mb_substr($text, 0, 500),
- 'conversation_id' => $conversationId,
- ]);
- }
- private function upsertClient(int $companyId, string $providerId, string $name): array
- {
- $providerId = trim($providerId) !== '' ? trim($providerId) : 'unknown';
- $phone = $this->normalizePhone($providerId);
- $name = trim($name) !== '' ? mb_substr(trim($name), 0, 100) : $phone;
- $stmt = $this->pdo->prepare(
- "SELECT *
- FROM client
- WHERE company_id = :company_id
- AND client_deleted_at = 'infinity'
- AND (client_provider_id = :provider_id OR client_phone = :phone)
- ORDER BY CASE WHEN client_provider_id = :provider_id THEN 0 ELSE 1 END
- LIMIT 1"
- );
- $stmt->execute([
- 'company_id' => $companyId,
- 'provider_id' => $providerId,
- 'phone' => $phone,
- ]);
- $existing = $stmt->fetch(\PDO::FETCH_ASSOC);
- if ($existing !== false) {
- $update = $this->pdo->prepare(
- "UPDATE client
- SET client_provider_id = :provider_id,
- client_name = :name
- WHERE client_id = :client_id
- RETURNING *"
- );
- $update->execute([
- 'provider_id' => $providerId,
- 'name' => $name,
- 'client_id' => (int) $existing['client_id'],
- ]);
- return $update->fetch(\PDO::FETCH_ASSOC) ?: $existing;
- }
- $insert = $this->pdo->prepare(
- "INSERT INTO client (
- company_id, client_provider_id, client_phone, client_name,
- client_email, client_segment, client_is_registered
- ) VALUES (
- :company_id, :provider_id, :phone, :name,
- '', '', FALSE
- ) RETURNING *"
- );
- $insert->execute([
- 'company_id' => $companyId,
- 'provider_id' => $providerId,
- 'phone' => $phone,
- 'name' => $name,
- ]);
- return $insert->fetch(\PDO::FETCH_ASSOC) ?: [];
- }
- private function upsertConversation(int $companyId, int $integrationId, int $operatorId, int $clientId, string $chatId, array $payload, string $messageText, bool $isOperator, string $sentAt): array
- {
- $providerChatId = (string) ($payload['chat_provider_id'] ?? $payload['provider_chat_id'] ?? '');
- $lastFrom = $isOperator ? 'operator' : 'client';
- $stmt = $this->pdo->prepare(
- "INSERT INTO conversation (
- company_id, integration_id, operator_id, client_id, conversation_external_id,
- conversation_provider_id, conversation_channel, conversation_status, conversation_is_automated,
- conversation_started_at, conversation_closed_at, conversation_sla_deadline,
- conversation_last_message_at, conversation_last_message_preview, conversation_last_message_from,
- conversation_impact_value, conversation_ticket_value, conversation_conversion_chance,
- conversation_optimum_window
- ) VALUES (
- :company_id, :integration_id, :operator_id, :client_id, :external_id,
- :provider_id, 'whatsapp', 'open', FALSE,
- :sent_at, 'infinity', 'infinity',
- :sent_at, :preview, :last_from,
- 0, 0, 0,
- ''
- )
- ON CONFLICT (integration_id, conversation_external_id, conversation_deleted_at)
- DO UPDATE SET
- operator_id = EXCLUDED.operator_id,
- client_id = EXCLUDED.client_id,
- conversation_provider_id = CASE WHEN EXCLUDED.conversation_provider_id <> '' THEN EXCLUDED.conversation_provider_id ELSE conversation.conversation_provider_id END,
- conversation_last_message_at = GREATEST(conversation.conversation_last_message_at, EXCLUDED.conversation_last_message_at),
- conversation_last_message_preview = EXCLUDED.conversation_last_message_preview,
- conversation_last_message_from = EXCLUDED.conversation_last_message_from
- RETURNING *"
- );
- $stmt->execute([
- 'company_id' => $companyId,
- 'integration_id' => $integrationId,
- 'operator_id' => $operatorId,
- 'client_id' => $clientId,
- 'external_id' => $chatId,
- 'provider_id' => $providerChatId,
- 'sent_at' => $sentAt,
- 'preview' => mb_substr($messageText, 0, 500),
- 'last_from' => $lastFrom,
- ]);
- return $stmt->fetch(\PDO::FETCH_ASSOC) ?: [];
- }
- private function upsertMessage(int $conversationId, string $messageId, array $payload, string $senderProviderId, bool $isOperator, string $text, string $sentAt): array
- {
- $stmt = $this->pdo->prepare(
- "INSERT INTO message (
- conversation_id, quoted_message_id, message_external_id, message_provider_id,
- message_sender_provider_id, message_is_operator, message_type, message_content,
- message_seen, message_delivered, message_edited, message_deleted, message_hidden,
- message_is_event, message_event_type, message_sent_at
- ) VALUES (
- :conversation_id, 0, :external_id, :provider_id,
- :sender_provider_id, :is_operator, :type, :content,
- :seen, :delivered, :edited, :deleted, :hidden,
- :is_event, :event_type, :sent_at
- )
- ON CONFLICT (conversation_id, message_external_id, message_deleted_at)
- DO UPDATE SET
- message_provider_id = EXCLUDED.message_provider_id,
- message_sender_provider_id = EXCLUDED.message_sender_provider_id,
- message_is_operator = EXCLUDED.message_is_operator,
- message_type = EXCLUDED.message_type,
- message_content = EXCLUDED.message_content,
- message_seen = EXCLUDED.message_seen,
- message_delivered = EXCLUDED.message_delivered,
- message_edited = EXCLUDED.message_edited,
- message_deleted = EXCLUDED.message_deleted,
- message_hidden = EXCLUDED.message_hidden,
- message_is_event = EXCLUDED.message_is_event,
- message_event_type = EXCLUDED.message_event_type,
- message_sent_at = EXCLUDED.message_sent_at
- RETURNING *"
- );
- $stmt->execute([
- 'conversation_id' => $conversationId,
- 'external_id' => $messageId,
- 'provider_id' => (string) ($payload['provider_id'] ?? ''),
- 'sender_provider_id' => $senderProviderId,
- 'is_operator' => $isOperator,
- 'type' => mb_substr((string) ($payload['type'] ?? 'text'), 0, 20),
- 'content' => $text,
- 'seen' => (bool) ($payload['seen'] ?? false),
- 'delivered' => (bool) ($payload['delivered'] ?? false),
- 'edited' => (bool) ($payload['edited'] ?? false),
- 'deleted' => (bool) ($payload['deleted'] ?? false),
- 'hidden' => (bool) ($payload['hidden'] ?? false),
- 'is_event' => (bool) ($payload['is_event'] ?? false),
- 'event_type' => (int) ($payload['event_type'] ?? 0),
- 'sent_at' => $sentAt,
- ]);
- return $stmt->fetch(\PDO::FETCH_ASSOC) ?: [];
- }
- private function replaceParticipants(int $conversationId, array $attendees): void
- {
- foreach ($attendees as $attendee) {
- if (!is_array($attendee)) {
- continue;
- }
- $providerId = $this->value($attendee, ['attendee_provider_id', 'id'], '');
- if ($providerId === '') {
- continue;
- }
- $stmt = $this->pdo->prepare(
- "INSERT INTO conversation_participant (
- conversation_id, participant_provider_id, participant_name, participant_is_admin
- ) VALUES (
- :conversation_id, :provider_id, :name, FALSE
- )
- ON CONFLICT (conversation_id, participant_provider_id, participant_deleted_at)
- DO UPDATE SET participant_name = EXCLUDED.participant_name"
- );
- $stmt->execute([
- 'conversation_id' => $conversationId,
- 'provider_id' => $providerId,
- 'name' => mb_substr($this->value($attendee, ['attendee_name', 'name'], $providerId), 0, 100),
- ]);
- }
- }
- private function replaceAttachments(int $messageId, array $attachments): void
- {
- foreach ($attachments as $attachment) {
- if (!is_array($attachment)) {
- continue;
- }
- $externalId = trim((string) ($attachment['id'] ?? ''));
- if ($externalId === '') {
- continue;
- }
- $stmt = $this->pdo->prepare(
- "INSERT INTO message_attachment (
- message_id, attachment_external_id, attachment_url, attachment_type,
- attachment_mime_type, attachment_file_name, attachment_size
- ) VALUES (
- :message_id, :external_id, :url, :type,
- :mime_type, :file_name, :size
- )
- ON CONFLICT (message_id, attachment_external_id, attachment_deleted_at)
- DO UPDATE SET
- attachment_url = EXCLUDED.attachment_url,
- attachment_type = EXCLUDED.attachment_type,
- attachment_mime_type = EXCLUDED.attachment_mime_type,
- attachment_file_name = EXCLUDED.attachment_file_name,
- attachment_size = EXCLUDED.attachment_size"
- );
- $stmt->execute([
- 'message_id' => $messageId,
- 'external_id' => $externalId,
- 'url' => (string) ($attachment['url'] ?? ''),
- 'type' => mb_substr((string) ($attachment['type'] ?? ''), 0, 50),
- 'mime_type' => mb_substr((string) ($attachment['mimetype'] ?? $attachment['mime_type'] ?? ''), 0, 100),
- 'file_name' => mb_substr((string) ($attachment['file_name'] ?? $externalId), 0, 255),
- 'size' => $this->attachmentSize($attachment),
- ]);
- }
- }
- private function resolveOperatorId(int $companyId, int $preferredOperatorId): int
- {
- if ($preferredOperatorId > 0) {
- $stmt = $this->pdo->prepare(
- "SELECT operator_id
- FROM operator
- WHERE company_id = :company_id
- AND operator_id = :operator_id
- AND operator_deleted_at = 'infinity'
- LIMIT 1"
- );
- $stmt->execute([
- 'company_id' => $companyId,
- 'operator_id' => $preferredOperatorId,
- ]);
- if ($stmt->fetchColumn() !== false) {
- return $preferredOperatorId;
- }
- }
- $stmt = $this->pdo->prepare(
- "SELECT operator_id
- FROM operator
- WHERE company_id = :company_id
- AND operator_deleted_at = 'infinity'
- ORDER BY operator_id ASC
- LIMIT 1"
- );
- $stmt->execute(['company_id' => $companyId]);
- $operatorId = $stmt->fetchColumn();
- return $operatorId === false ? 0 : (int) $operatorId;
- }
- private function isOperatorMessage(array $integration, array $payload, string $senderProviderId): bool
- {
- if (isset($payload['is_sender'])) {
- return (bool) $payload['is_sender'];
- }
- $accountInfo = is_array($payload['account_info'] ?? null) ? $payload['account_info'] : [];
- $accountUserId = (string) ($accountInfo['user_id'] ?? $integration['integration_external_account_id'] ?? '');
- return $accountUserId !== '' && $senderProviderId !== '' && $accountUserId === $senderProviderId;
- }
- private function normalizePhone(string $providerId): string
- {
- $digits = preg_replace('/\D+/', '', $providerId) ?? '';
- if ($digits !== '') {
- return mb_substr($digits, 0, 20);
- }
- return mb_substr((string) abs(crc32($providerId)), 0, 20);
- }
- private function normalizeTimestamp(string $timestamp): string
- {
- $time = strtotime($timestamp);
- if ($time === false) {
- $time = time();
- }
- return gmdate('Y-m-d H:i:s', $time);
- }
- private function resolveClientIdentity(array $integration, array $payload, array $sender, array $attendees, string $senderProviderId, bool $isOperator): array
- {
- $senderName = $this->value($sender, ['attendee_name', 'name'], $senderProviderId);
- if (!$isOperator) {
- return [$senderProviderId, $senderName];
- }
- $accountInfo = is_array($payload['account_info'] ?? null) ? $payload['account_info'] : [];
- $accountUserId = (string) ($accountInfo['user_id'] ?? $integration['integration_external_account_id'] ?? '');
- foreach ($attendees as $attendee) {
- if (!is_array($attendee)) {
- continue;
- }
- $providerId = $this->value($attendee, ['attendee_provider_id', 'id'], '');
- if ($providerId === '' || $providerId === $accountUserId) {
- continue;
- }
- return [
- $providerId,
- $this->value($attendee, ['attendee_name', 'name'], $providerId),
- ];
- }
- return [$senderProviderId, $senderName];
- }
- private function attachmentSize(array $attachment): int
- {
- $size = $attachment['size'] ?? 0;
- if (is_array($size)) {
- return (int) ($size['bytes'] ?? $size['file_size'] ?? 0);
- }
- if (is_numeric($size)) {
- return (int) $size;
- }
- return 0;
- }
- private function value(array $data, array $keys, string $default): string
- {
- foreach ($keys as $key) {
- if (isset($data[$key]) && trim((string) $data[$key]) !== '') {
- return trim((string) $data[$key]);
- }
- }
- return $default;
- }
- }
|