Conversation
| @@ -1,5 +1,5 @@ | |||
| __version__ = "0.32.7" | |||
| __db_version__ = 7 | |||
| __db_version__ = 8 | |||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_pool | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE tf_kafka_produce_event(); |
| CREATE TRIGGER trigger_kafka_ip_net_pool | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_pool | ||
| FOR EACH ROW |
|
|
||
| CREATE TRIGGER trigger_kafka_ip_net_pool | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_pool |
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
|
|
||
| CREATE TRIGGER trigger_kafka_ip_net_pool | ||
| AFTER INSERT OR UPDATE OR DELETE |
| -- Triggers that write to kafka_produce_event | ||
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan |
|
|
||
| -- Triggers that write to kafka_produce_event | ||
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE |
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; | ||
| RETURN NEW; |
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; |
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_vrf | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE tf_kafka_produce_event(); |
| CREATE TRIGGER trigger_kafka_ip_net_vrf | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_vrf | ||
| FOR EACH ROW |
|
|
||
| CREATE TRIGGER trigger_kafka_ip_net_vrf | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_vrf |
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
|
|
||
| CREATE TRIGGER trigger_kafka_ip_net_vrf | ||
| AFTER INSERT OR UPDATE OR DELETE |
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE tf_kafka_produce_event(); |
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan | ||
| FOR EACH ROW |
| -- Triggers that write to kafka_produce_event | ||
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan |
| -- disable these triggers at startup depending on configuration. | ||
| -- | ||
| CREATE TABLE IF NOT EXISTS kafka_produce_event ( | ||
| id SERIAL PRIMARY KEY, |
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; | ||
| RETURN NEW; |
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; |
| IF TG_OP = 'DELETE' THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); |
There was a problem hiding this comment.
indentation contains tabs
line too long (123 > 79 characters)
| BEGIN | ||
| IF TG_OP = 'DELETE' THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN |
1b5f450 to
b2e086d
Compare
| CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ | ||
| BEGIN | ||
| IF TG_OP = 'DELETE' THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); |
There was a problem hiding this comment.
indentation contains tabs
line too long (123 > 79 characters)
|
|
||
| CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ | ||
| BEGIN | ||
| IF TG_OP = 'DELETE' THEN |
| conn.rollback() | ||
|
|
||
| except psycopg2.InterfaceError as e: | ||
| LOG.error("Database interface error in kafka_producer loop: %s. Reconnecting to database.", e) |
There was a problem hiding this comment.
line too long (106 > 79 characters)
|
|
||
| # mark processed for ids that were attempted | ||
| if ids: | ||
| cur.execute("UPDATE kafka_produce_event SET processed = TRUE WHERE id = ANY(%s);", (ids,)) |
There was a problem hiding this comment.
line too long (106 > 79 characters)
| LOG.info("Attempting to recreate Kafka producer after flush failure") | ||
| producer = _ensure_producer(cfg) | ||
| if producer is None: | ||
| LOG.error("Unable to recreate Kafka producer; exiting kafka_producer process") |
There was a problem hiding this comment.
line too long (98 > 79 characters)
| send_failed = True | ||
| break | ||
| except Exception as e: | ||
| # if a single event fails to prepare, log and skip it for now |
There was a problem hiding this comment.
line too long (81 > 79 characters)
| ids.append(event_id) | ||
| else: | ||
| # sending failed (producer might be disconnected); mark to recreate producer | ||
| LOG.error("Failed to send event id %s to topic %s; will attempt to recreate producer", event_id, topic) |
There was a problem hiding this comment.
line too long (127 > 79 characters)
| if sent: | ||
| ids.append(event_id) | ||
| else: | ||
| # sending failed (producer might be disconnected); mark to recreate producer |
There was a problem hiding this comment.
line too long (100 > 79 characters)
| topic = _table_to_topic(topic_prefix, table) | ||
| message = {'event_type': etype, 'payload': payload} | ||
| # send with retries and exponential backoff | ||
| sent = _send_with_backoff(producer, topic, message, payload.get('id')) |
There was a problem hiding this comment.
line too long (90 > 79 characters)
| producer = _ensure_producer(cfg) | ||
| if producer is None: | ||
| # _ensure_producer returns None if kafka-python-ng isn't installed or config invalid | ||
| LOG.error("Kafka producer not available (missing dependency or bad config), exiting kafka_producer process") |
There was a problem hiding this comment.
line too long (116 > 79 characters)
No description provided.