This is a transaction execution simulation based on a dependency graph. I used coroutines to execute in parallel. The dependency graph records whether there is a data dependency relationship between the execution of transactions. Transactions without dependencies can be executed in parallel. But I encountered a problem. The program was blocked during testing. When I debugged, I found that the modification of monitorResult
and duplicateG
in the goroutine would not be successful, resulting in an infinite loop without changing the size of duplicateG
. How to solve this problem?
ackage peers
import (
"fmt"
"sync"
"time"
)
type Asset struct {
AppraisedValue int
ID int
version int
}
type TxReadWriteSet struct {
Reads KVRead
Writes KVWrite
}
// read set
type KVRead struct {
Key int
}
// write set
type KVWriteType int
const (
SET KVWriteType = iota
DEL
INC
)
// KVWrite
type KVWrite struct {
Key int
Type KVWriteType
Value int
}
type singleTransaction struct {
TransactionID int
TxReadWriteSet TxReadWriteSet
TransactionTime int32
}
type Graph struct {
edges map[int][]int
nodes map[int]bool
}
func SimulateWithGraph(transactions []singleTransaction, graph Graph) ([]Asset, []singleTransaction, []singleTransaction) {
var successfulTransactions []singleTransaction
var errorTransactions []singleTransaction
monitorResult := GetWorldState()
duplicateG := graph
// Create a wait group to wait for all goroutines to complete
var wg sync.WaitGroup
var mutex sync.Mutex
// Creates a channel for receiving successful and erroneous trades
successfulCh := make(chan singleTransaction)
errorCh := make(chan singleTransaction)
// Transactions are executed in topologically sorted order
for len(duplicateG.nodes) > 0 {
// Gets a node with an entry degree of 0
queue := getZeroInDegreeNodes(graph)
// Execute transactions for nodes with a degree of 0
txns := getTransaction(queue, transactions)
for _, txn := range txns {
wg.Add(1)
go func(txn singleTransaction) {
read := txn.TxReadWriteSet.Reads
startVersion := monitorResult[read.Key].version
// Check the write operation
mutex.Lock()
write := txn.TxReadWriteSet.Writes
if write.Type == DEL && monitorResult[write.Key].AppraisedValue < write.Value {
errorCh <- txn
return
}
switch write.Type {
case INC:
monitorResult[write.Key].AppraisedValue += write.Value
case DEL:
monitorResult[write.Key].AppraisedValue -= write.Value
case SET:
monitorResult[write.Key].AppraisedValue = write.Value
}
//Check that the version numbers are consistent before committing
if monitorResult[read.Key].version != startVersion {
mutex.Unlock()
errorCh <- txn
return
}
monitorResult[write.Key].version++
mutex.Unlock()
successfulCh <- txn
// Deletes the executed node
delete(duplicateG.nodes, txn.TransactionID)
delete(duplicateG.edges, txn.TransactionID)
wg.Done()
}(txn)
}
}
wg.Wait()
close(successfulCh)
close(errorCh)
// Collect successful and erroneous trades
for txn := range successfulCh {
successfulTransactions = append(successfulTransactions, txn)
}
for txn := range errorCh {
errorTransactions = append(errorTransactions, txn)
}
return monitorResult, successfulTransactions, errorTransactions
}
The nodes with 0 indegree in dependency graph should modify different asset values so I was wondering should I add a mutex and how to do it