Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
SERVER_NAME=evolution
SERVER_TYPE=http
SERVER_PORT=8080
# Server URL - Set your application url
Expand Down Expand Up @@ -96,6 +97,36 @@ SQS_SECRET_ACCESS_KEY=
SQS_ACCOUNT_ID=
SQS_REGION=

SQS_GLOBAL_ENABLED=false
SQS_GLOBAL_FORCE_SINGLE_QUEUE=false
SQS_GLOBAL_APPLICATION_STARTUP=false
SQS_GLOBAL_CALL=false
SQS_GLOBAL_CHATS_DELETE=false
SQS_GLOBAL_CHATS_SET=false
SQS_GLOBAL_CHATS_UPDATE=false
SQS_GLOBAL_CHATS_UPSERT=false
SQS_GLOBAL_CONNECTION_UPDATE=false
SQS_GLOBAL_CONTACTS_SET=false
SQS_GLOBAL_CONTACTS_UPDATE=false
SQS_GLOBAL_CONTACTS_UPSERT=false
SQS_GLOBAL_GROUP_PARTICIPANTS_UPDATE=false
SQS_GLOBAL_GROUPS_UPDATE=false
SQS_GLOBAL_GROUPS_UPSERT=false
SQS_GLOBAL_LABELS_ASSOCIATION=false
SQS_GLOBAL_LABELS_EDIT=false
SQS_GLOBAL_LOGOUT_INSTANCE=false
SQS_GLOBAL_MESSAGES_DELETE=false
SQS_GLOBAL_MESSAGES_EDITED=false
SQS_GLOBAL_MESSAGES_SET=false
SQS_GLOBAL_MESSAGES_UPDATE=false
SQS_GLOBAL_MESSAGES_UPSERT=false
SQS_GLOBAL_PRESENCE_UPDATE=false
SQS_GLOBAL_QRCODE_UPDATED=false
SQS_GLOBAL_REMOVE_INSTANCE=false
SQS_GLOBAL_SEND_MESSAGE=false
SQS_GLOBAL_TYPEBOT_CHANGE_STATUS=false
SQS_GLOBAL_TYPEBOT_START=false

# Websocket - Environment variables
WEBSOCKET_ENABLED=false
WEBSOCKET_GLOBAL_EVENTS=false
Expand Down
217 changes: 136 additions & 81 deletions src/api/integrations/event/sqs/sqs.controller.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as s3Service from '@api/integrations/storage/s3/libs/minio.server';
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
import { configService, Log, Sqs } from '@config/env.config';
import { configService, HttpServer, Log, S3, Sqs } from '@config/env.config';
import { Logger } from '@config/logger.config';

import { EmitData, EventController, EventControllerInterface } from '../event.controller';
Expand All @@ -15,27 +16,29 @@
super(prismaRepository, waMonitor, configService.get<Sqs>('SQS')?.ENABLED, 'sqs');
}

public init(): void {
public async init(): Promise<void> {
if (!this.status) {
return;
}

new Promise<void>((resolve) => {
const awsConfig = configService.get<Sqs>('SQS');
const awsConfig = configService.get<Sqs>('SQS');

this.sqs = new SQS({
credentials: {
accessKeyId: awsConfig.ACCESS_KEY_ID,
secretAccessKey: awsConfig.SECRET_ACCESS_KEY,
},
this.sqs = new SQS({
credentials: {
accessKeyId: awsConfig.ACCESS_KEY_ID,
secretAccessKey: awsConfig.SECRET_ACCESS_KEY,
},

region: awsConfig.REGION,
});
region: awsConfig.REGION,
});

this.logger.info('SQS initialized');
this.logger.info('SQS initialized');

resolve();
});
const sqsConfig = configService.get<Sqs>('SQS');
if (this.sqs && sqsConfig.GLOBAL_ENABLED) {
const sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]);
await this.saveQueues(sqsConfig.GLOBAL_PREFIX_NAME, sqsEvents, true);
}
}

private set channel(sqs: SQS) {
Expand All @@ -47,7 +50,7 @@
}

override async set(instanceName: string, data: EventDto): Promise<any> {
if (!this.status) {
if (!this.status || configService.get<Sqs>('SQS').GLOBAL_ENABLED) {
return;
}

Expand Down Expand Up @@ -75,6 +78,7 @@
instanceId: this.monitor.waInstances[instanceName].instanceId,
},
};

console.log('*** payload: ', payload);
return this.prisma[this.name].upsert(payload);
}
Expand All @@ -98,100 +102,151 @@
return;
}

const instanceSqs = await this.get(instanceName);
const sqsLocal = instanceSqs?.events;
const we = event.replace(/[.-]/gm, '_').toUpperCase();

if (instanceSqs?.enabled) {
if (this.sqs) {
if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) {
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
const queueName = `${instanceName}_${eventFormatted}.fifo`;
const sqsConfig = configService.get<Sqs>('SQS');
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;

const message = {
event,
instance: instanceName,
data,
server_url: serverUrl,
date_time: dateTime,
sender,
apikey: apiKey,
};

const params = {
MessageBody: JSON.stringify(message),
MessageGroupId: 'evolution',
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
QueueUrl: sqsUrl,
};

this.sqs.sendMessage(params, (err) => {
if (err) {
this.logger.error({
local: `${origin}.sendData-SQS`,
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else {
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: `${origin}.sendData-SQS`,
...message,
};

this.logger.log(logData);
}
}
if (this.sqs) {
const serverConfig = configService.get<HttpServer>('SERVER');
const sqsConfig = configService.get<Sqs>('SQS');

const we = event.replace(/[.-]/gm, '_').toUpperCase();

let sqsEvents = [];
if (sqsConfig.GLOBAL_ENABLED) {
sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]);
} else {
const instanceSqs = await this.get(instanceName);
if (instanceSqs?.enabled && Array.isArray(instanceSqs?.events)) {
sqsEvents = instanceSqs?.events;
}
}

if (Array.isArray(sqsEvents) && sqsEvents.includes(we)) {
const prefixName = sqsConfig.GLOBAL_ENABLED ? sqsConfig.GLOBAL_PREFIX_NAME : instanceName;
const eventFormatted =
sqsConfig.GLOBAL_ENABLED && sqsConfig.GLOBAL_FORCE_SINGLE_QUEUE
? 'singlequeue'
: `${event.replace('.', '_').toLowerCase()}`;
const queueName = `${prefixName}_${eventFormatted}.fifo`;
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;

const message = {
event,
instance: instanceName,
dataType: 'json',
data,
server: serverConfig.NAME,
server_url: serverUrl,
date_time: dateTime,
sender,
apikey: apiKey,
};

const jsonStr = JSON.stringify(message);
const size = Buffer.byteLength(jsonStr, 'utf8');
if (size > sqsConfig.MAX_PAYLOAD_SIZE) {
if (!configService.get<S3>('S3').ENABLE) {
this.logger.error(
`${instanceName} - ${eventFormatted} - SQS ignored: payload (${size} bytes) exceeds SQS size limit (${sqsConfig.MAX_PAYLOAD_SIZE} bytes) and S3 storage is not enabled.`,
);
return;
}

const buffer = Buffer.from(jsonStr, 'utf8');
const fullName = `messages/${instanceName}_${eventFormatted}_${Date.now()}.json`;

await s3Service.uploadFile(fullName, buffer, size, {
'Content-Type': 'application/json',
'Cache-Control': 'no-store',
});

const fileUrl = await s3Service.getObjectUrl(fullName);

message.data = { fileUrl };
message.dataType = 's3';
}

const messageGroupId = sqsConfig.GLOBAL_ENABLED ? `${serverConfig.NAME}-${eventFormatted}-${instanceName}` : 'evolution';

Check failure on line 166 in src/api/integrations/event/sqs/sqs.controller.ts

View workflow job for this annotation

GitHub Actions / check-lint-and-build

Replace `·?·`${serverConfig.NAME}-${eventFormatted}-${instanceName}`` with `⏎··········?·`${serverConfig.NAME}-${eventFormatted}-${instanceName}`⏎·········`
const isGlobalEnabled = sqsConfig.GLOBAL_ENABLED;
const params = {
MessageBody: JSON.stringify(message),
MessageGroupId: messageGroupId,
QueueUrl: sqsUrl,
...(!isGlobalEnabled && {
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
}),
};

this.sqs.sendMessage(params, (err) => {
if (err) {
this.logger.error({
local: `${origin}.sendData-SQS`,
params: JSON.stringify(message),
sqsUrl: sqsUrl,
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: `${origin}.sendData-SQS`,
...message,
};

this.logger.log(logData);
}
});
}
}
}

private async saveQueues(instanceName: string, events: string[], enable: boolean) {
private async saveQueues(prefixName: string, events: string[], enable: boolean) {
if (enable) {
const eventsFinded = await this.listQueuesByInstance(instanceName);
const sqsConfig = configService.get<Sqs>('SQS');
const eventsFinded = await this.listQueues(prefixName);
console.log('eventsFinded', eventsFinded);

for (const event of events) {
const normalizedEvent = event.toLowerCase();

const normalizedEvent =
sqsConfig.GLOBAL_ENABLED && sqsConfig.GLOBAL_FORCE_SINGLE_QUEUE ? 'singlequeue' : event.toLowerCase();
if (eventsFinded.includes(normalizedEvent)) {
this.logger.info(`A queue para o evento "${normalizedEvent}" já existe. Ignorando criação.`);
continue;
}

const queueName = `${instanceName}_${normalizedEvent}.fifo`;

const queueName = `${prefixName}_${normalizedEvent}.fifo`;
try {
const isGlobalEnabled = sqsConfig.GLOBAL_ENABLED;
const createCommand = new CreateQueueCommand({
QueueName: queueName,
Attributes: {
FifoQueue: 'true',
...(isGlobalEnabled && { ContentBasedDeduplication: 'true' }),
},
});

const data = await this.sqs.send(createCommand);
this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`);
} catch (err: any) {
this.logger.error(`Erro ao criar queue ${queueName}: ${err.message}`);
}

if (sqsConfig.GLOBAL_ENABLED && sqsConfig.GLOBAL_FORCE_SINGLE_QUEUE) {
break;
}
}
}
}

private async listQueuesByInstance(instanceName: string) {
private async listQueues(prefixName: string) {
let existingQueues: string[] = [];

try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
QueueNamePrefix: `${prefixName}_`,
});

const listData = await this.sqs.send(listCommand);
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
// Extrai o nome da fila a partir da URL
Expand All @@ -201,32 +256,32 @@
});
}
} catch (error: any) {
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
this.logger.error(`Erro ao listar filas para ${prefixName}: ${error.message}`);
return;
}

// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
return existingQueues
.map((queueName) => {
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
if (queueName.startsWith(`${prefixName}_`) && queueName.endsWith('.fifo')) {
return queueName.substring(prefixName.length + 1, queueName.length - 5).toLowerCase();
}
return '';
})
.filter((event) => event !== '');
}

// Para uma futura feature de exclusão forçada das queues
private async removeQueuesByInstance(instanceName: string) {
private async removeQueuesByInstance(prefixName: string) {
try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
QueueNamePrefix: `${prefixName}_`,
});
const listData = await this.sqs.send(listCommand);

if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
this.logger.info(`No queues found for instance ${instanceName}`);
this.logger.info(`No queues found for ${prefixName}`);
return;
}

Expand All @@ -240,7 +295,7 @@
}
}
} catch (err: any) {
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
this.logger.error(`Error listing queues for ${prefixName}: ${err.message}`);
}
}
}
Loading
Loading