UnipileMessagesModel.php 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. <?php
  2. namespace Models;
  3. use Libs\Database;
  4. class UnipileMessagesModel
  5. {
  6. private \PDO $pdo;
  7. public function __construct()
  8. {
  9. $this->pdo = Database::pdo();
  10. }
  11. public function storeWebhookEvent(int $integrationId, string $eventType, string $externalId, array $payload, bool $processed): int
  12. {
  13. $payloadJson = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
  14. if ($payloadJson === false) {
  15. $payloadJson = '{}';
  16. }
  17. $stmt = $this->pdo->prepare(
  18. "INSERT INTO webhook_event (
  19. integration_id, webhook_event_type, webhook_event_external_id,
  20. webhook_event_payload, webhook_event_processed, webhook_event_processed_at
  21. ) VALUES (
  22. :integration_id, :event_type, :external_id,
  23. CAST(:payload AS jsonb), :processed, NOW()
  24. )
  25. ON CONFLICT (integration_id, webhook_event_type, webhook_event_external_id, webhook_event_deleted_at)
  26. DO UPDATE SET
  27. webhook_event_payload = EXCLUDED.webhook_event_payload,
  28. webhook_event_processed = EXCLUDED.webhook_event_processed,
  29. webhook_event_processed_at = NOW()
  30. RETURNING webhook_event_id"
  31. );
  32. $stmt->execute([
  33. 'integration_id' => $integrationId,
  34. 'event_type' => mb_substr($eventType, 0, 50),
  35. 'external_id' => $externalId,
  36. 'payload' => $payloadJson,
  37. 'processed' => $processed,
  38. ]);
  39. return (int) $stmt->fetchColumn();
  40. }
  41. public function upsertMessageFromWebhook(array $integration, array $payload): ?array
  42. {
  43. $accountType = mb_strtoupper(trim((string) ($payload['account_type'] ?? '')));
  44. if ($accountType !== 'WHATSAPP') {
  45. return null;
  46. }
  47. $chatId = trim((string) ($payload['chat_id'] ?? ''));
  48. $messageId = trim((string) ($payload['message_id'] ?? ''));
  49. if ($chatId === '' || $messageId === '') {
  50. return null;
  51. }
  52. $companyId = (int) ($integration['company_id'] ?? 0);
  53. $integrationId = (int) ($integration['integration_id'] ?? 0);
  54. $operatorId = $this->resolveOperatorId($companyId, (int) ($integration['operator_id'] ?? 0));
  55. if ($companyId <= 0 || $integrationId <= 0 || $operatorId <= 0) {
  56. return null;
  57. }
  58. $sender = is_array($payload['sender'] ?? null) ? $payload['sender'] : [];
  59. $attendees = is_array($payload['attendees'] ?? null) ? $payload['attendees'] : [];
  60. $senderProviderId = $this->value($sender, ['attendee_provider_id', 'sender_id', 'id'], 'unknown');
  61. $isOperator = $this->isOperatorMessage($integration, $payload, $senderProviderId);
  62. [$clientProviderId, $clientName] = $this->resolveClientIdentity($integration, $payload, $sender, $attendees, $senderProviderId, $isOperator);
  63. $client = $this->upsertClient($companyId, $clientProviderId, $clientName);
  64. $messageText = (string) ($payload['message'] ?? $payload['text'] ?? '');
  65. $sentAt = $this->normalizeTimestamp((string) ($payload['timestamp'] ?? ''));
  66. $conversation = $this->upsertConversation($companyId, $integrationId, $operatorId, (int) $client['client_id'], $chatId, $payload, $messageText, $isOperator, $sentAt);
  67. $message = $this->upsertMessage((int) $conversation['conversation_id'], $messageId, $payload, $senderProviderId, $isOperator, $messageText, $sentAt);
  68. $this->replaceParticipants((int) $conversation['conversation_id'], $attendees);
  69. $this->replaceAttachments((int) $message['message_id'], is_array($payload['attachments'] ?? null) ? $payload['attachments'] : []);
  70. return [
  71. 'conversation_id' => (int) $conversation['conversation_id'],
  72. 'message_id' => (int) $message['message_id'],
  73. ];
  74. }
  75. public function createOutboundLocalMessage(int $conversationId, array $unipileResponse, string $fallbackText): array
  76. {
  77. $messageId = trim((string) ($unipileResponse['id'] ?? $unipileResponse['message_id'] ?? ''));
  78. if ($messageId === '') {
  79. $messageId = 'local-' . bin2hex(random_bytes(12));
  80. }
  81. $payload = [
  82. 'provider_id' => $unipileResponse['provider_id'] ?? '',
  83. 'seen' => $unipileResponse['seen'] ?? false,
  84. 'delivered' => $unipileResponse['delivered'] ?? true,
  85. 'edited' => $unipileResponse['edited'] ?? false,
  86. 'deleted' => $unipileResponse['deleted'] ?? false,
  87. 'hidden' => $unipileResponse['hidden'] ?? false,
  88. 'is_event' => $unipileResponse['is_event'] ?? false,
  89. 'event_type' => $unipileResponse['event_type'] ?? 0,
  90. ];
  91. return $this->upsertMessage(
  92. $conversationId,
  93. $messageId,
  94. $payload,
  95. (string) ($unipileResponse['sender_id'] ?? ''),
  96. true,
  97. (string) ($unipileResponse['text'] ?? $unipileResponse['message'] ?? $fallbackText),
  98. $this->normalizeTimestamp((string) ($unipileResponse['timestamp'] ?? ''))
  99. );
  100. }
  101. public function getConversationForSending(int $companyId, int $conversationId): ?array
  102. {
  103. $stmt = $this->pdo->prepare(
  104. "SELECT c.*, i.integration_account_id, i.integration_is_connected, i.integration_status
  105. FROM conversation c
  106. INNER JOIN integration i
  107. ON i.integration_id = c.integration_id
  108. AND i.integration_deleted_at = 'infinity'
  109. WHERE c.company_id = :company_id
  110. AND c.conversation_id = :conversation_id
  111. AND c.conversation_deleted_at = 'infinity'
  112. AND i.integration_provider = 'whatsapp'
  113. LIMIT 1"
  114. );
  115. $stmt->execute([
  116. 'company_id' => $companyId,
  117. 'conversation_id' => $conversationId,
  118. ]);
  119. $row = $stmt->fetch(\PDO::FETCH_ASSOC);
  120. return $row === false ? null : $row;
  121. }
  122. public function updateConversationAfterOutbound(int $conversationId, string $text): void
  123. {
  124. $stmt = $this->pdo->prepare(
  125. "UPDATE conversation
  126. SET conversation_last_message_at = NOW(),
  127. conversation_last_message_preview = :preview,
  128. conversation_last_message_from = 'operator'
  129. WHERE conversation_id = :conversation_id
  130. AND conversation_deleted_at = 'infinity'"
  131. );
  132. $stmt->execute([
  133. 'preview' => mb_substr($text, 0, 500),
  134. 'conversation_id' => $conversationId,
  135. ]);
  136. }
  137. private function upsertClient(int $companyId, string $providerId, string $name): array
  138. {
  139. $providerId = trim($providerId) !== '' ? trim($providerId) : 'unknown';
  140. $phone = $this->normalizePhone($providerId);
  141. $name = trim($name) !== '' ? mb_substr(trim($name), 0, 100) : $phone;
  142. $stmt = $this->pdo->prepare(
  143. "SELECT *
  144. FROM client
  145. WHERE company_id = :company_id
  146. AND client_deleted_at = 'infinity'
  147. AND (client_provider_id = :provider_id OR client_phone = :phone)
  148. ORDER BY CASE WHEN client_provider_id = :provider_id THEN 0 ELSE 1 END
  149. LIMIT 1"
  150. );
  151. $stmt->execute([
  152. 'company_id' => $companyId,
  153. 'provider_id' => $providerId,
  154. 'phone' => $phone,
  155. ]);
  156. $existing = $stmt->fetch(\PDO::FETCH_ASSOC);
  157. if ($existing !== false) {
  158. $update = $this->pdo->prepare(
  159. "UPDATE client
  160. SET client_provider_id = :provider_id,
  161. client_name = :name
  162. WHERE client_id = :client_id
  163. RETURNING *"
  164. );
  165. $update->execute([
  166. 'provider_id' => $providerId,
  167. 'name' => $name,
  168. 'client_id' => (int) $existing['client_id'],
  169. ]);
  170. return $update->fetch(\PDO::FETCH_ASSOC) ?: $existing;
  171. }
  172. $insert = $this->pdo->prepare(
  173. "INSERT INTO client (
  174. company_id, client_provider_id, client_phone, client_name,
  175. client_email, client_segment, client_is_registered
  176. ) VALUES (
  177. :company_id, :provider_id, :phone, :name,
  178. '', '', FALSE
  179. ) RETURNING *"
  180. );
  181. $insert->execute([
  182. 'company_id' => $companyId,
  183. 'provider_id' => $providerId,
  184. 'phone' => $phone,
  185. 'name' => $name,
  186. ]);
  187. return $insert->fetch(\PDO::FETCH_ASSOC) ?: [];
  188. }
  189. private function upsertConversation(int $companyId, int $integrationId, int $operatorId, int $clientId, string $chatId, array $payload, string $messageText, bool $isOperator, string $sentAt): array
  190. {
  191. $providerChatId = (string) ($payload['chat_provider_id'] ?? $payload['provider_chat_id'] ?? '');
  192. $lastFrom = $isOperator ? 'operator' : 'client';
  193. $stmt = $this->pdo->prepare(
  194. "INSERT INTO conversation (
  195. company_id, integration_id, operator_id, client_id, conversation_external_id,
  196. conversation_provider_id, conversation_channel, conversation_status, conversation_is_automated,
  197. conversation_started_at, conversation_closed_at, conversation_sla_deadline,
  198. conversation_last_message_at, conversation_last_message_preview, conversation_last_message_from,
  199. conversation_impact_value, conversation_ticket_value, conversation_conversion_chance,
  200. conversation_optimum_window
  201. ) VALUES (
  202. :company_id, :integration_id, :operator_id, :client_id, :external_id,
  203. :provider_id, 'whatsapp', 'open', FALSE,
  204. :sent_at, 'infinity', 'infinity',
  205. :sent_at, :preview, :last_from,
  206. 0, 0, 0,
  207. ''
  208. )
  209. ON CONFLICT (integration_id, conversation_external_id, conversation_deleted_at)
  210. DO UPDATE SET
  211. operator_id = EXCLUDED.operator_id,
  212. client_id = EXCLUDED.client_id,
  213. conversation_provider_id = CASE WHEN EXCLUDED.conversation_provider_id <> '' THEN EXCLUDED.conversation_provider_id ELSE conversation.conversation_provider_id END,
  214. conversation_last_message_at = GREATEST(conversation.conversation_last_message_at, EXCLUDED.conversation_last_message_at),
  215. conversation_last_message_preview = EXCLUDED.conversation_last_message_preview,
  216. conversation_last_message_from = EXCLUDED.conversation_last_message_from
  217. RETURNING *"
  218. );
  219. $stmt->execute([
  220. 'company_id' => $companyId,
  221. 'integration_id' => $integrationId,
  222. 'operator_id' => $operatorId,
  223. 'client_id' => $clientId,
  224. 'external_id' => $chatId,
  225. 'provider_id' => $providerChatId,
  226. 'sent_at' => $sentAt,
  227. 'preview' => mb_substr($messageText, 0, 500),
  228. 'last_from' => $lastFrom,
  229. ]);
  230. return $stmt->fetch(\PDO::FETCH_ASSOC) ?: [];
  231. }
  232. private function upsertMessage(int $conversationId, string $messageId, array $payload, string $senderProviderId, bool $isOperator, string $text, string $sentAt): array
  233. {
  234. $stmt = $this->pdo->prepare(
  235. "INSERT INTO message (
  236. conversation_id, quoted_message_id, message_external_id, message_provider_id,
  237. message_sender_provider_id, message_is_operator, message_type, message_content,
  238. message_seen, message_delivered, message_edited, message_deleted, message_hidden,
  239. message_is_event, message_event_type, message_sent_at
  240. ) VALUES (
  241. :conversation_id, 0, :external_id, :provider_id,
  242. :sender_provider_id, :is_operator, :type, :content,
  243. :seen, :delivered, :edited, :deleted, :hidden,
  244. :is_event, :event_type, :sent_at
  245. )
  246. ON CONFLICT (conversation_id, message_external_id, message_deleted_at)
  247. DO UPDATE SET
  248. message_provider_id = EXCLUDED.message_provider_id,
  249. message_sender_provider_id = EXCLUDED.message_sender_provider_id,
  250. message_is_operator = EXCLUDED.message_is_operator,
  251. message_type = EXCLUDED.message_type,
  252. message_content = EXCLUDED.message_content,
  253. message_seen = EXCLUDED.message_seen,
  254. message_delivered = EXCLUDED.message_delivered,
  255. message_edited = EXCLUDED.message_edited,
  256. message_deleted = EXCLUDED.message_deleted,
  257. message_hidden = EXCLUDED.message_hidden,
  258. message_is_event = EXCLUDED.message_is_event,
  259. message_event_type = EXCLUDED.message_event_type,
  260. message_sent_at = EXCLUDED.message_sent_at
  261. RETURNING *"
  262. );
  263. $stmt->execute([
  264. 'conversation_id' => $conversationId,
  265. 'external_id' => $messageId,
  266. 'provider_id' => (string) ($payload['provider_id'] ?? ''),
  267. 'sender_provider_id' => $senderProviderId,
  268. 'is_operator' => $isOperator,
  269. 'type' => mb_substr((string) ($payload['type'] ?? 'text'), 0, 20),
  270. 'content' => $text,
  271. 'seen' => (bool) ($payload['seen'] ?? false),
  272. 'delivered' => (bool) ($payload['delivered'] ?? false),
  273. 'edited' => (bool) ($payload['edited'] ?? false),
  274. 'deleted' => (bool) ($payload['deleted'] ?? false),
  275. 'hidden' => (bool) ($payload['hidden'] ?? false),
  276. 'is_event' => (bool) ($payload['is_event'] ?? false),
  277. 'event_type' => (int) ($payload['event_type'] ?? 0),
  278. 'sent_at' => $sentAt,
  279. ]);
  280. return $stmt->fetch(\PDO::FETCH_ASSOC) ?: [];
  281. }
  282. private function replaceParticipants(int $conversationId, array $attendees): void
  283. {
  284. foreach ($attendees as $attendee) {
  285. if (!is_array($attendee)) {
  286. continue;
  287. }
  288. $providerId = $this->value($attendee, ['attendee_provider_id', 'id'], '');
  289. if ($providerId === '') {
  290. continue;
  291. }
  292. $stmt = $this->pdo->prepare(
  293. "INSERT INTO conversation_participant (
  294. conversation_id, participant_provider_id, participant_name, participant_is_admin
  295. ) VALUES (
  296. :conversation_id, :provider_id, :name, FALSE
  297. )
  298. ON CONFLICT (conversation_id, participant_provider_id, participant_deleted_at)
  299. DO UPDATE SET participant_name = EXCLUDED.participant_name"
  300. );
  301. $stmt->execute([
  302. 'conversation_id' => $conversationId,
  303. 'provider_id' => $providerId,
  304. 'name' => mb_substr($this->value($attendee, ['attendee_name', 'name'], $providerId), 0, 100),
  305. ]);
  306. }
  307. }
  308. private function replaceAttachments(int $messageId, array $attachments): void
  309. {
  310. foreach ($attachments as $attachment) {
  311. if (!is_array($attachment)) {
  312. continue;
  313. }
  314. $externalId = trim((string) ($attachment['id'] ?? ''));
  315. if ($externalId === '') {
  316. continue;
  317. }
  318. $stmt = $this->pdo->prepare(
  319. "INSERT INTO message_attachment (
  320. message_id, attachment_external_id, attachment_url, attachment_type,
  321. attachment_mime_type, attachment_file_name, attachment_size
  322. ) VALUES (
  323. :message_id, :external_id, :url, :type,
  324. :mime_type, :file_name, :size
  325. )
  326. ON CONFLICT (message_id, attachment_external_id, attachment_deleted_at)
  327. DO UPDATE SET
  328. attachment_url = EXCLUDED.attachment_url,
  329. attachment_type = EXCLUDED.attachment_type,
  330. attachment_mime_type = EXCLUDED.attachment_mime_type,
  331. attachment_file_name = EXCLUDED.attachment_file_name,
  332. attachment_size = EXCLUDED.attachment_size"
  333. );
  334. $stmt->execute([
  335. 'message_id' => $messageId,
  336. 'external_id' => $externalId,
  337. 'url' => (string) ($attachment['url'] ?? ''),
  338. 'type' => mb_substr((string) ($attachment['type'] ?? ''), 0, 50),
  339. 'mime_type' => mb_substr((string) ($attachment['mimetype'] ?? $attachment['mime_type'] ?? ''), 0, 100),
  340. 'file_name' => mb_substr((string) ($attachment['file_name'] ?? $externalId), 0, 255),
  341. 'size' => $this->attachmentSize($attachment),
  342. ]);
  343. }
  344. }
  345. private function resolveOperatorId(int $companyId, int $preferredOperatorId): int
  346. {
  347. if ($preferredOperatorId > 0) {
  348. $stmt = $this->pdo->prepare(
  349. "SELECT operator_id
  350. FROM operator
  351. WHERE company_id = :company_id
  352. AND operator_id = :operator_id
  353. AND operator_deleted_at = 'infinity'
  354. LIMIT 1"
  355. );
  356. $stmt->execute([
  357. 'company_id' => $companyId,
  358. 'operator_id' => $preferredOperatorId,
  359. ]);
  360. if ($stmt->fetchColumn() !== false) {
  361. return $preferredOperatorId;
  362. }
  363. }
  364. $stmt = $this->pdo->prepare(
  365. "SELECT operator_id
  366. FROM operator
  367. WHERE company_id = :company_id
  368. AND operator_deleted_at = 'infinity'
  369. ORDER BY operator_id ASC
  370. LIMIT 1"
  371. );
  372. $stmt->execute(['company_id' => $companyId]);
  373. $operatorId = $stmt->fetchColumn();
  374. return $operatorId === false ? 0 : (int) $operatorId;
  375. }
  376. private function isOperatorMessage(array $integration, array $payload, string $senderProviderId): bool
  377. {
  378. if (isset($payload['is_sender'])) {
  379. return (bool) $payload['is_sender'];
  380. }
  381. $accountInfo = is_array($payload['account_info'] ?? null) ? $payload['account_info'] : [];
  382. $accountUserId = (string) ($accountInfo['user_id'] ?? $integration['integration_external_account_id'] ?? '');
  383. return $accountUserId !== '' && $senderProviderId !== '' && $accountUserId === $senderProviderId;
  384. }
  385. private function normalizePhone(string $providerId): string
  386. {
  387. $digits = preg_replace('/\D+/', '', $providerId) ?? '';
  388. if ($digits !== '') {
  389. return mb_substr($digits, 0, 20);
  390. }
  391. return mb_substr((string) abs(crc32($providerId)), 0, 20);
  392. }
  393. private function normalizeTimestamp(string $timestamp): string
  394. {
  395. $time = strtotime($timestamp);
  396. if ($time === false) {
  397. $time = time();
  398. }
  399. return gmdate('Y-m-d H:i:s', $time);
  400. }
  401. private function resolveClientIdentity(array $integration, array $payload, array $sender, array $attendees, string $senderProviderId, bool $isOperator): array
  402. {
  403. $senderName = $this->value($sender, ['attendee_name', 'name'], $senderProviderId);
  404. if (!$isOperator) {
  405. return [$senderProviderId, $senderName];
  406. }
  407. $accountInfo = is_array($payload['account_info'] ?? null) ? $payload['account_info'] : [];
  408. $accountUserId = (string) ($accountInfo['user_id'] ?? $integration['integration_external_account_id'] ?? '');
  409. foreach ($attendees as $attendee) {
  410. if (!is_array($attendee)) {
  411. continue;
  412. }
  413. $providerId = $this->value($attendee, ['attendee_provider_id', 'id'], '');
  414. if ($providerId === '' || $providerId === $accountUserId) {
  415. continue;
  416. }
  417. return [
  418. $providerId,
  419. $this->value($attendee, ['attendee_name', 'name'], $providerId),
  420. ];
  421. }
  422. return [$senderProviderId, $senderName];
  423. }
  424. private function attachmentSize(array $attachment): int
  425. {
  426. $size = $attachment['size'] ?? 0;
  427. if (is_array($size)) {
  428. return (int) ($size['bytes'] ?? $size['file_size'] ?? 0);
  429. }
  430. if (is_numeric($size)) {
  431. return (int) $size;
  432. }
  433. return 0;
  434. }
  435. private function value(array $data, array $keys, string $default): string
  436. {
  437. foreach ($keys as $key) {
  438. if (isset($data[$key]) && trim((string) $data[$key]) !== '') {
  439. return trim((string) $data[$key]);
  440. }
  441. }
  442. return $default;
  443. }
  444. }