Last active 1722262439

rootspring's Avatar rootspring revised this gist 1722262439. Go to revision

1 file changed, 118 insertions

worker.rs(file created)

@@ -0,0 +1,118 @@
1 + // NOTE: Because the mlua crate is not Sync, we can use tokio spawn_local to run the Lua VM in an async context
2 + // but pinned to a single thread
3 + //
4 + // This is highly experimental
5 + pub struct LuaWorker {
6 + /// A handle that allows stopping the VM inside its tokio localset
7 + ///
8 + /// This is wrapped in an option to allow destroying the handle when the LuaWorker is dropped
9 + pub tx_stop: Option<tokio::sync::oneshot::Sender<()>>,
10 + /// A channel used for sending requests to the VM
11 + pub tx_msg_recv: tokio::sync::mpsc::Sender<LuaWorkerRequest>,
12 + /// A channel that can be used to listen for a response from the VM
13 + pub rx_msg_resp: tokio::sync::mpsc::Receiver<LuaWorkerResponse>,
14 + }
15 +
16 + impl LuaWorker {
17 + // Executes a Lua script in the Lua VM
18 + pub async fn exec(
19 + vm: &Lua,
20 + template: &str,
21 + args: serde_json::Value,
22 + ) -> Result<serde_json::Value, base_data::Error> {
23 + let f: LuaFunction = vm
24 + .load(template)
25 + .eval_async()
26 + .await
27 + .map_err(|e| LuaError::external(e.to_string()))?;
28 +
29 + let _args = vm
30 + .create_table()
31 + .map_err(|e| LuaError::external(e.to_string()))?;
32 +
33 + let args = vm
34 + .to_value(&args)
35 + .map_err(|e| LuaError::external(e.to_string()))?;
36 +
37 + _args
38 + .set("args", args)
39 + .map_err(|e| LuaError::external(e.to_string()))?;
40 +
41 + let v: LuaValue = f
42 + .call_async(_args)
43 + .await
44 + .map_err(|e| LuaError::external(e.to_string()))?;
45 +
46 + let v = serde_json::to_value(v).map_err(|e| LuaError::external(e.to_string()))?;
47 +
48 + Ok(v)
49 + }
50 +
51 + // Spawns a new LuaWorker with the given Lua VM
52 + pub fn new(lua: Lua) -> Self {
53 + let (tx_stop, rx_stop) = tokio::sync::oneshot::channel();
54 + let (tx_msg_recv, rx_msg_recv) = tokio::sync::mpsc::channel(32);
55 + let (tx_msg_resp, rx_msg_resp) = tokio::sync::mpsc::channel(32);
56 +
57 + let worker = LuaWorker {
58 + tx_stop: Some(tx_stop),
59 + tx_msg_recv,
60 + rx_msg_resp,
61 + };
62 +
63 + let rt = tokio::runtime::Builder::new_current_thread()
64 + .enable_all()
65 + .build()
66 + .unwrap();
67 +
68 + std::thread::spawn(move || {
69 + rt.block_on(async move {
70 + let lua = Arc::new(lua);
71 + let mut rx_stop = rx_stop;
72 + let mut rx_msg_recv = rx_msg_recv;
73 + let mut tx_msg_resp = tx_msg_resp;
74 +
75 + loop {
76 + let lua = lua.clone();
77 + let tx_msg_resp = tx_msg_resp.clone();
78 +
79 + tokio::select! {
80 + /*_ = rx_stop => {
81 + break;
82 + },*/
83 + Some(msg) = rx_msg_recv.recv() => {
84 + tokio::task::spawn_local(async move {
85 + let lua = lua.clone();
86 + let res = LuaWorker::exec(&lua, &msg.template, msg.args).await;
87 + let _ = tx_msg_resp.send(match res {
88 + Ok(v) => LuaWorkerResponse::Ok(v),
89 + Err(e) => LuaWorkerResponse::Err(e.to_string()),
90 + });
91 + });
92 + }
93 + }
94 + }
95 + });
96 + });
97 +
98 + worker
99 + }
100 + }
101 +
102 + impl Drop for LuaWorker {
103 + fn drop(&mut self) {
104 + if let Some(sender) = self.tx_stop.take() {
105 + let _ = sender.send(());
106 + }
107 + }
108 + }
109 +
110 + pub struct LuaWorkerRequest {
111 + pub template: String,
112 + pub args: serde_json::Value,
113 + }
114 +
115 + pub enum LuaWorkerResponse {
116 + Ok(serde_json::Value),
117 + Err(String),
118 + }
Newer Older