rootspring revised this gist . Go to revision
1 file changed, 180 insertions
data_tasks.go(file created)
@@ -0,0 +1,180 @@ | |||
1 | + | package assets | |
2 | + | ||
3 | + | import ( | |
4 | + | "popplio/state" | |
5 | + | ||
6 | + | "github.com/infinitybotlist/eureka/dovewing" | |
7 | + | jsoniter "github.com/json-iterator/go" | |
8 | + | "go.uber.org/zap" | |
9 | + | ) | |
10 | + | ||
11 | + | var json = jsoniter.ConfigFastest | |
12 | + | ||
13 | + | func DataTask(taskId, taskName, id, ip string) { | |
14 | + | del := taskName == "data_delete" | |
15 | + | ||
16 | + | var done bool | |
17 | + | ||
18 | + | l, _ := newTaskLogger(taskId) | |
19 | + | ||
20 | + | // Fail failed tasks | |
21 | + | defer func() { | |
22 | + | if !done { | |
23 | + | l.Error("Failed to complete task", zap.String("id", id), zap.Bool("del", del)) | |
24 | + | ||
25 | + | _, err := state.Pool.Exec(state.Context, "UPDATE tasks SET state = $1 WHERE task_id = $2", "failed", taskId) | |
26 | + | ||
27 | + | if err != nil { | |
28 | + | l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) | |
29 | + | } | |
30 | + | } | |
31 | + | }() | |
32 | + | ||
33 | + | tx, err := state.Pool.Begin(state.Context) | |
34 | + | ||
35 | + | if err != nil { | |
36 | + | l.Error("Failed to begin transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) | |
37 | + | return | |
38 | + | } | |
39 | + | ||
40 | + | defer tx.Rollback(state.Context) | |
41 | + | ||
42 | + | _, err = tx.Exec(state.Context, "DELETE FROM tasks WHERE task_name = $1 AND task_id != $2 AND for_user = $3", taskName, taskId, id) | |
43 | + | ||
44 | + | if err != nil { | |
45 | + | l.Error("Failed to delete old data tasks", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) | |
46 | + | return | |
47 | + | } | |
48 | + | ||
49 | + | l.Info("Started DR/DDR task", zap.String("id", id), zap.Bool("del", del)) | |
50 | + | ||
51 | + | tableRefs, err := getAllTableRefs(tx) | |
52 | + | ||
53 | + | if err != nil { | |
54 | + | l.Error("Failed to get table refs", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) | |
55 | + | return | |
56 | + | } | |
57 | + | ||
58 | + | // Begin fetching data recursively | |
59 | + | collectedData := map[string][]any{} | |
60 | + | cachedEntityIds := map[string][]string{} | |
61 | + | for _, tableRef := range tableRefs { | |
62 | + | fOp, ok := tableOps[tableRef.ForeignTableName] | |
63 | + | ||
64 | + | if !ok { | |
65 | + | l.Warn("Cannot fetch table due to no support for its foreign ref", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id)) | |
66 | + | continue | |
67 | + | } | |
68 | + | ||
69 | + | var entityIds []string | |
70 | + | ||
71 | + | if ids, ok := cachedEntityIds[tableRef.ForeignTableName]; ok { | |
72 | + | entityIds = ids | |
73 | + | } else { | |
74 | + | entityIds, err := fOp.GetIdsForUser(tx, l, id) | |
75 | + | ||
76 | + | if err != nil { | |
77 | + | l.Error("Failed to get ids", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) | |
78 | + | continue | |
79 | + | } | |
80 | + | ||
81 | + | cachedEntityIds[tableRef.ForeignTableName] = entityIds | |
82 | + | } | |
83 | + | ||
84 | + | var fkeysNotAdded bool | |
85 | + | if _, ok := collectedData[tableRef.ForeignTableName]; !ok { | |
86 | + | fkeysNotAdded = true | |
87 | + | } | |
88 | + | ||
89 | + | // Handle the entities now | |
90 | + | for _, entityId := range entityIds { | |
91 | + | l.Info("Fetching table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId)) | |
92 | + | rows, err := fOp.Fetch(tx, l, tableRef.TableName, tableRef.ColumnName, entityId) | |
93 | + | ||
94 | + | if err != nil { | |
95 | + | l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) | |
96 | + | continue | |
97 | + | } | |
98 | + | ||
99 | + | for _, row := range rows { | |
100 | + | if _, ok := collectedData[tableRef.TableName]; !ok { | |
101 | + | collectedData[tableRef.TableName] = []any{} | |
102 | + | } | |
103 | + | ||
104 | + | collectedData[tableRef.TableName] = append(collectedData[tableRef.TableName], row) | |
105 | + | } | |
106 | + | ||
107 | + | if del { | |
108 | + | err = fOp.Delete(tx, l, tableRef.TableName, tableRef.ColumnName, entityId) | |
109 | + | ||
110 | + | if err != nil { | |
111 | + | l.Error("Failed to delete table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) | |
112 | + | } | |
113 | + | } | |
114 | + | ||
115 | + | // Fetch the foreign table's data | |
116 | + | if fkeysNotAdded { | |
117 | + | l.Info("Fetching foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.String("entityId", entityId)) | |
118 | + | rows, err := fOp.Fetch(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId) | |
119 | + | ||
120 | + | if err != nil { | |
121 | + | l.Error("Failed to fetch table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) | |
122 | + | continue | |
123 | + | } | |
124 | + | ||
125 | + | for _, row := range rows { | |
126 | + | if _, ok := collectedData[tableRef.ForeignTableName]; !ok { | |
127 | + | collectedData[tableRef.ForeignTableName] = []any{} | |
128 | + | } | |
129 | + | ||
130 | + | collectedData[tableRef.ForeignTableName] = append(collectedData[tableRef.ForeignTableName], row) | |
131 | + | } | |
132 | + | ||
133 | + | if del { | |
134 | + | err = fOp.Delete(tx, l, tableRef.ForeignTableName, tableRef.ForeignColumnName, entityId) | |
135 | + | ||
136 | + | if err != nil { | |
137 | + | l.Error("Failed to delete foreign table", zap.String("table", tableRef.TableName), zap.String("foreignTable", tableRef.ForeignTableName), zap.String("column", tableRef.ColumnName), zap.String("id", id), zap.Error(err)) | |
138 | + | } | |
139 | + | } | |
140 | + | } | |
141 | + | } | |
142 | + | } | |
143 | + | ||
144 | + | // Delete from psql user_cache if `del` is true | |
145 | + | if del { | |
146 | + | for _, dovewingPlatform := range []*dovewing.DiscordState{state.DovewingPlatformDiscord} { | |
147 | + | l.Info("Deleting from user_cache [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName())) | |
148 | + | res, err := dovewing.ClearUser(state.Context, id, dovewingPlatform, dovewing.ClearUserReq{}) | |
149 | + | ||
150 | + | if err != nil { | |
151 | + | l.Error("Error clearing user [dovewing]", zap.Error(err), zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName())) | |
152 | + | } | |
153 | + | ||
154 | + | l.Info("Cleared user [dovewing]", zap.String("id", id), zap.String("platform", dovewingPlatform.PlatformName()), zap.Any("res", res)) | |
155 | + | } | |
156 | + | } | |
157 | + | ||
158 | + | finalOutput := map[string]any{ | |
159 | + | "data": collectedData, | |
160 | + | "meta": map[string]any{ | |
161 | + | "request_ip": ip, | |
162 | + | }, | |
163 | + | } | |
164 | + | ||
165 | + | _, err = tx.Exec(state.Context, "UPDATE tasks SET output = $1, state = $2 WHERE task_id = $3", finalOutput, "completed", taskId) | |
166 | + | ||
167 | + | if err != nil { | |
168 | + | l.Error("Failed to update task", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) | |
169 | + | return | |
170 | + | } | |
171 | + | ||
172 | + | err = tx.Commit(state.Context) | |
173 | + | ||
174 | + | if err != nil { | |
175 | + | l.Error("Failed to commit transaction", zap.Error(err), zap.String("id", id), zap.Bool("del", del)) | |
176 | + | return | |
177 | + | } | |
178 | + | ||
179 | + | done = true | |
180 | + | } |
Newer
Older