Naposledy aktivní 1722262439

worker.rs Raw
1// NOTE: Because the mlua crate is not Sync, we can use tokio spawn_local to run the Lua VM in an async context
2// but pinned to a single thread
3//
4// This is highly experimental
5pub struct LuaWorker {
6 /// A handle that allows stopping the VM inside its tokio localset
7 ///
8 /// This is wrapped in an option to allow destroying the handle when the LuaWorker is dropped
9 pub tx_stop: Option<tokio::sync::oneshot::Sender<()>>,
10 /// A channel used for sending requests to the VM
11 pub tx_msg_recv: tokio::sync::mpsc::Sender<LuaWorkerRequest>,
12 /// A channel that can be used to listen for a response from the VM
13 pub rx_msg_resp: tokio::sync::mpsc::Receiver<LuaWorkerResponse>,
14}
15
16impl LuaWorker {
17 // Executes a Lua script in the Lua VM
18 pub async fn exec(
19 vm: &Lua,
20 template: &str,
21 args: serde_json::Value,
22 ) -> Result<serde_json::Value, base_data::Error> {
23 let f: LuaFunction = vm
24 .load(template)
25 .eval_async()
26 .await
27 .map_err(|e| LuaError::external(e.to_string()))?;
28
29 let _args = vm
30 .create_table()
31 .map_err(|e| LuaError::external(e.to_string()))?;
32
33 let args = vm
34 .to_value(&args)
35 .map_err(|e| LuaError::external(e.to_string()))?;
36
37 _args
38 .set("args", args)
39 .map_err(|e| LuaError::external(e.to_string()))?;
40
41 let v: LuaValue = f
42 .call_async(_args)
43 .await
44 .map_err(|e| LuaError::external(e.to_string()))?;
45
46 let v = serde_json::to_value(v).map_err(|e| LuaError::external(e.to_string()))?;
47
48 Ok(v)
49 }
50
51 // Spawns a new LuaWorker with the given Lua VM
52 pub fn new(lua: Lua) -> Self {
53 let (tx_stop, rx_stop) = tokio::sync::oneshot::channel();
54 let (tx_msg_recv, rx_msg_recv) = tokio::sync::mpsc::channel(32);
55 let (tx_msg_resp, rx_msg_resp) = tokio::sync::mpsc::channel(32);
56
57 let worker = LuaWorker {
58 tx_stop: Some(tx_stop),
59 tx_msg_recv,
60 rx_msg_resp,
61 };
62
63 let rt = tokio::runtime::Builder::new_current_thread()
64 .enable_all()
65 .build()
66 .unwrap();
67
68 std::thread::spawn(move || {
69 rt.block_on(async move {
70 let lua = Arc::new(lua);
71 let mut rx_stop = rx_stop;
72 let mut rx_msg_recv = rx_msg_recv;
73 let mut tx_msg_resp = tx_msg_resp;
74
75 loop {
76 let lua = lua.clone();
77 let tx_msg_resp = tx_msg_resp.clone();
78
79 tokio::select! {
80 /*_ = rx_stop => {
81 break;
82 },*/
83 Some(msg) = rx_msg_recv.recv() => {
84 tokio::task::spawn_local(async move {
85 let lua = lua.clone();
86 let res = LuaWorker::exec(&lua, &msg.template, msg.args).await;
87 let _ = tx_msg_resp.send(match res {
88 Ok(v) => LuaWorkerResponse::Ok(v),
89 Err(e) => LuaWorkerResponse::Err(e.to_string()),
90 });
91 });
92 }
93 }
94 }
95 });
96 });
97
98 worker
99 }
100}
101
102impl Drop for LuaWorker {
103 fn drop(&mut self) {
104 if let Some(sender) = self.tx_stop.take() {
105 let _ = sender.send(());
106 }
107 }
108}
109
110pub struct LuaWorkerRequest {
111 pub template: String,
112 pub args: serde_json::Value,
113}
114
115pub enum LuaWorkerResponse {
116 Ok(serde_json::Value),
117 Err(String),
118}
119