Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 139 additions & 60 deletions src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ export class ChatwootService {
private readonly cache: CacheService,
) {}

private pgClient = postgresClient.getChatwootConnection();
private async getPgClient() {
return postgresClient.getChatwootConnection();
}

private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> {
const cacheKey = `${instance.instanceName}:getProvider`;
Expand Down Expand Up @@ -382,7 +384,8 @@ export class ChatwootService {
if (!uri) return false;

const sqlTags = `SELECT id, taggings_count FROM tags WHERE name = $1 LIMIT 1`;
const tagData = (await this.pgClient.query(sqlTags, [nameInbox]))?.rows[0];
const pgClient = await this.getPgClient();
const tagData = (await pgClient.query(sqlTags, [nameInbox]))?.rows[0];
let tagId = tagData?.id;
const taggingsCount = tagData?.taggings_count || 0;

Expand All @@ -392,18 +395,18 @@ export class ChatwootService {
DO UPDATE SET taggings_count = tags.taggings_count + 1
RETURNING id`;

tagId = (await this.pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
tagId = (await pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;

const sqlCheckTagging = `SELECT 1 FROM taggings
WHERE tag_id = $1 AND taggable_type = 'Contact' AND taggable_id = $2 AND context = 'labels' LIMIT 1`;

const taggingExists = (await this.pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
const taggingExists = (await pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;

if (!taggingExists) {
const sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at)
VALUES ($1, 'Contact', $2, 'labels', NOW())`;

await this.pgClient.query(sqlInsertLabel, [tagId, contactId]);
await pgClient.query(sqlInsertLabel, [tagId, contactId]);
}

return true;
Expand Down Expand Up @@ -861,6 +864,7 @@ export class ChatwootService {
messageBody?: any,
sourceId?: string,
quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) {
const client = await this.clientCw(instance);

Expand All @@ -869,32 +873,86 @@ export class ChatwootService {
return null;
}

const replyToIds = await this.getReplyToIds(messageBody, instance);
const doCreateMessage = async (convId: number) => {
const replyToIds = await this.getReplyToIds(messageBody, instance);

const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const sourceReplyId = quotedMsg?.chatwootMessageId || null;

const message = await client.messages.create({
accountId: this.provider.accountId,
conversationId: conversationId,
data: {
content: content,
message_type: messageType,
attachments: attachments,
private: privateMessage || false,
source_id: sourceId,
content_attributes: {
...replyToIds,
const message = await client.messages.create({
accountId: this.provider.accountId,
conversationId: convId,
data: {
content: content,
message_type: messageType,
attachments: attachments,
private: privateMessage || false,
source_id: sourceId,
content_attributes: {
...replyToIds,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
},
});
});

if (!message) {
this.logger.warn('message not found');
return null;
if (!message) {
this.logger.warn('message not found');
return null;
}

return message;
};

try {
return await doCreateMessage(conversationId);
} catch (error) {
return this.handleStaleConversationError(
error,
instance,
conversationId,
messageBody,
messageBodyForRetry,
'createMessage',
(newConvId) => doCreateMessage(newConvId),
);
}
}

return message;
private async handleStaleConversationError(
error: any,
instance: InstanceDto,
conversationId: number,
messageBody: any,
messageBodyForRetry: any,
functionName: string,
originalFunction: (newConversationId: number) => Promise<any>,
) {
if (axios.isAxiosError(error) && error.response?.status === 404) {
this.logger.warn(
`Conversation ${conversationId} not found in Chatwoot. Retrying operation from ${functionName}...`,
);
const bodyForRetry = messageBodyForRetry || messageBody;

if (!bodyForRetry || !bodyForRetry.key?.remoteJid) {
this.logger.error(`Cannot retry ${functionName} without a message body for context.`);
return null;
}

const { remoteJid } = bodyForRetry.key;
const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`;
await this.cache.delete(cacheKey);

const newConversationId = await this.createConversation(instance, bodyForRetry);
if (!newConversationId) {
this.logger.error(`Failed to create new conversation for ${remoteJid} during retry.`);
return null;
}

this.logger.log(`Retrying ${functionName} for ${remoteJid} with new conversation ${newConversationId}`);
return await originalFunction(newConversationId);
} else {
this.logger.error(`Error in ${functionName}: ${error}`);
throw error;
}
}

public async getOpenConversationByContact(
Expand Down Expand Up @@ -987,6 +1045,7 @@ export class ChatwootService {
messageBody?: any,
sourceId?: string,
quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) {
if (sourceId && this.isImportHistoryAvailable()) {
const messageAlreadySaved = await chatwootImport.getExistingSourceIds([sourceId], conversationId);
Expand All @@ -997,54 +1056,65 @@ export class ChatwootService {
}
}
}
const data = new FormData();
const doSendData = async (convId: number) => {
const data = new FormData();

if (content) {
data.append('content', content);
}
if (content) {
data.append('content', content);
}

data.append('message_type', messageType);
data.append('message_type', messageType);

data.append('attachments[]', fileStream, { filename: fileName });
data.append('attachments[]', fileStream, { filename: fileName });

const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const sourceReplyId = quotedMsg?.chatwootMessageId || null;

if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);
if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);

if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
}
}
}

if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}
if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}

if (sourceId) {
data.append('source_id', sourceId);
}
if (sourceId) {
data.append('source_id', sourceId);
}

const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${conversationId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${convId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
};

const { data: responseData } = await axios.request(config);
return responseData;
};

try {
const { data } = await axios.request(config);

return data;
return await doSendData(conversationId);
} catch (error) {
this.logger.error(error);
return this.handleStaleConversationError(
error,
instance,
conversationId,
messageBody,
messageBodyForRetry,
'sendData',
(newConvId) => doSendData(newConvId),
);
}
}

Expand Down Expand Up @@ -2032,6 +2102,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand All @@ -2051,6 +2122,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand All @@ -2076,6 +2148,7 @@ export class ChatwootService {
},
'WAID:' + body.key.id,
quotedMsg,
body,
);
if (!send) {
this.logger.warn('message not sent');
Expand Down Expand Up @@ -2132,6 +2205,8 @@ export class ChatwootService {
instance,
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand Down Expand Up @@ -2173,6 +2248,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand All @@ -2192,6 +2268,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand Down Expand Up @@ -2262,6 +2339,7 @@ export class ChatwootService {
},
'WAID:' + body.key.id,
null,
body,
);
if (!send) {
this.logger.warn('edited message not sent');
Expand Down Expand Up @@ -2515,7 +2593,8 @@ export class ChatwootService {
and created_at >= now() - interval '6h'
order by created_at desc`;

const messagesData = (await this.pgClient.query(sqlMessages))?.rows;
const pgClient = await this.getPgClient();
const messagesData = (await pgClient.query(sqlMessages))?.rows;
const ids: string[] = messagesData
.filter((message) => !!message.source_id)
.map((message) => message.source_id.replace('WAID:', ''));
Expand Down