const mongoose = require("mongoose"); const actionSchema = new mongoose.Schema({ updated: Number, }); const Action = mongoose.model("Action", actionSchema); const userSchema = new mongoose.Schema({ name: String }); async function updateAction() { try { await Action.updateOne({}, { $set: { updated: 1 } }, { upsert: true }); console.log("✅ Cập nhật thành công"); } catch (err) { console.error("❌ Lỗi cập nhật:", err); } } const connectDB = async () => { try { const conn = await mongoose.connect(process.env.MONGODB_URI); console.log(`✅ MongoDB Connected:[ ${conn.connection.host} ]`); // Lấy db sau khi connect thành công const db = mongoose.connection.db; // admin is the database admin interface used to run server commands const admin = db.admin(); const isMaster = await admin.command({ isMaster: 1 }); // Only create a ChangeStream when connected to a replica set. // On standalone servers the $changeStream aggregation stage isn't supported // and attempting to watch() will emit an 'error' event that can crash the // process if not handled. Guard and add defensive error handling. if (isMaster && isMaster.setName) { try { const changeStream = db.watch([ { $match: { "ns.coll": { $ne: "actions" }, // bỏ qua collection "actions" }, }, ]); changeStream.on("change", async (change) => { try { console.log("📢 Change detected:", change); await updateAction(); } catch (e) { console.error("Error handling change event:", e); } }); // Defensive: listen for errors so they don't bubble as unhandled changeStream.on("error", (err) => { console.warn( "⚠️ ChangeStream error — disabling change stream:", err && err.message ? err.message : err, ); try { changeStream.close(); } catch (e) { /* ignore */ } }); } catch (e) { // If watch() throws synchronously (older drivers/servers) skip it. console.log( "ℹ️ ChangeStream not initialized (watch() failed):", e.message || e, ); } } else { console.log("ℹ️ ChangeStream skipped (no replica set detected)"); } } catch (error) { console.error(`Error: ${error.message}`); process.exit(1); } }; module.exports = connectDB;