/*
This file is part of datajams-evictions.
datajams-evictions is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
datajams-evictions is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with datajams-evictions. If not, see <https://www.gnu.org/licenses/>.
Copyright 2020 Luigi Bai
*/
const sqDAO = require("./lib/sqlDAO");
const msDAO = require("./lib/msDAO");
class xferCtlr {
constructor(props) {
Object.assign(this, props);
};
// Count all the rows in the source table:
sourceCount(sqD) {
return new Promise((resFN, rejFN) => {
sqD.get(
"SELECT COUNT(*) num FROM "+this.tableName,
(err, count) => {
if (err) {
rejFN(new Error("COUNT: "+err));
} else {
this.sourceCount = count.num;
resFN(count.num);
}
}
);
});
};
// Delete all the rows from the table:
sinkDelete(msD) {
return new Promise(async (resFN, rejFN) => {
try {
let msReq = new msD.Request();
let res = await msReq.batch("DELETE FROM "+this.tableName);
await msReq.cancel();
resFN({ msg: "DELETE FROM "+this.tableName+" done", val: res });
} catch (e) {
rejFN(new Error(e));
}
});
};
// Save a row, tail recursion:
_oneRow(srcS, destS, resFN, rejFN, count) {
srcS.get((err, row) => {
if (err) rejFN({ e: err, n: count });
else {
if (undefined === row) {
resFN(count);
} else {
destS.execute(row, (err, res) => {
if (err) {
if (this.ignoreInsertErrors)
console.error(err);
else
rejFN({ e: err, n: count });
} else {
(count % 100 === 0) && console.log(this.tableName+" Count: "+count);
this._oneRow(srcS, destS, resFN, rejFN, count + 1);
}
});
}
}
});
};
// Start the rows transfer. Calls _oneRow per row.
_doRows(srcD, destD, resFN, rejFN) {
this.sinkCount = 0;
let selStmt = srcD.prepare(this.sqlRowSEL);
let insStmt = new destD.PreparedStatement();
this.insBind(destD, insStmt);
insStmt.prepare(this.sqlRowINS)
.then(() => {
this._oneRow(
selStmt, insStmt,
async (val) => {
this.sinkCount = val;
selStmt.finalize();
await insStmt.unprepare();
resFN({
tbl: this.tableName,
msg: "finished",
sourceRows: this.sourceCount,
sinkRows: this.sinkCount
});
},
async (errO) => {
this.sinkCount = errO.n;
selStmt.finalize();
await insStmt.unprepare();
rejFN(new Error(errO.e +" at row "+this.sinkCount));
},
0
);
})
.catch(e => { rejFN(e); })
;
};
// Transfer all the rows from source to sink, return Promise
// That's because the database calls are async, so we have to
// wrap turtles all the way down.
xferPromise(srcD, destD) {
return new Promise((resFN, rejFN) => {
this.sourceCount(srcD)
.then(val => {
// console.log(val);
this.sinkDelete(destD)
.then(val => {
// console.log(val.msg);
this._doRows(srcD, destD, resFN, rejFN);
})
.catch(e => { console.error(e); })
;
})
.catch(e => { console.error(e); })
;
});
};
};
let casesCtlr = new xferCtlr({
tableName: "cases",
sqlRowSEL: "SELECT casenumber cnum, case_URL curl, filed_date fdt, case_status cstat FROM cases",
sqlRowINS: "INSERT INTO cases (casenumber, case_URL, filed_date, case_status) VALUES (@cnum, @curl, @fdt, @cstat)",
insBind: (db, stmt) => {
stmt.input("cnum", db.NVarChar);
stmt.input("curl", db.NVarChar);
stmt.input("fdt", db.NVarChar);
stmt.input("cstat", db.NVarChar);
}
});
let docketsCtlr = new xferCtlr({
tableName: "docket",
sqlRowSEL: "SELECT precinct pct, place plc, docket_dateTime ddt, URL url FROM docket",
sqlRowINS: "INSERT INTO docket (precinct, place, docket_dateTime, URL) VALUES (@pct, @plc, @ddt, @url)",
insBind: (db, stmt) => {
stmt.input("pct", db.Int);
stmt.input("plc", db.Int);
stmt.input("ddt", db.NVarChar);
stmt.input("url", db.NVarChar);
}
});
let dcCtlr = new xferCtlr({
tableName: "docketedcases",
sqlRowSEL: "SELECT precinct pct, place plc, docket_dateTime ddt, casenumber cnum, claim clm FROM docketedcases",
sqlRowINS: "INSERT INTO docketedcases (precinct, place, docket_dateTime, casenumber, claim) VALUES (@pct, @plc, @ddt, @cnum, @clm)",
insBind: (db, stmt) => {
stmt.input("pct", db.Int);
stmt.input("plc", db.Int);
stmt.input("ddt", db.NVarChar);
stmt.input("cnum", db.NVarChar);
stmt.input("clm", db.NVarChar);
}
});
let partyCtlr = new xferCtlr({
tableName: "party",
sqlRowSEL: "SELECT casenumber cnum, party_role pr, party_name pn, party_address pa FROM party",
sqlRowINS: "INSERT INTO party (casenumber, party_role, party_name, party_address) VALUES (@cnum, @pr, @pn, @pa)",
insBind: (db, stmt) => {
stmt.input("pr", db.NVarChar);
stmt.input("pn", db.NVarChar);
stmt.input("pa", db.NVarChar);
stmt.input("cnum", db.NVarChar);
}
});
let eventsCtlr = new xferCtlr({
tableName: "events",
sqlRowSEL: "SELECT casenumber cnum, eventDescription ed, dateAdded da FROM events",
sqlRowINS: "INSERT INTO events (casenumber, eventDescription, dateAdded) VALUES (@cnum, @ed, @da)",
insBind: (db, stmt) => {
stmt.input("cnum", db.NVarChar);
stmt.input("ed", db.NVarChar);
stmt.input("da", db.NVarChar);
}
});
// set up the SQLITE db first:
let opts = require("./creds")["SQLITE3"];
opts.connectCallback = (sqDB) => {
// Now get the msSQL connection:
let opts = require("./creds")["MS"];
opts.connectCallback = async (msDB) => {
try {
console.log(await casesCtlr.xferPromise(sqDB, msDB));
console.log(await docketsCtlr.xferPromise(sqDB, msDB));
console.log(await dcCtlr.xferPromise(sqDB, msDB));
// partyCtlr.ignoreInsertErrors = true;
console.log(await partyCtlr.xferPromise(sqDB, msDB));
console.log(await eventsCtlr.xferPromise(sqDB, msDB));
} catch (e) {
console.error(e);
} finally {
console.log("Shutting connections");
msDAO.disconnect(msDB);
sqDAO.disconnect(sqDB);
}
};
msDAO.connect(opts);
};
sqDAO.connect(opts);